package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import akka.japi.Procedure;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
* </ul>
*/
public class Follower extends AbstractRaftActorBehavior {
-
- private SnapshotTracker snapshotTracker = null;
+ private static final int SYNC_THRESHOLD = 10;
private final SyncStatusTracker initialSyncStatusTracker;
- private static final int SYNC_THRESHOLD = 10;
+ private final Procedure<ReplicatedLogEntry> appendAndPersistCallback = new Procedure<ReplicatedLogEntry>() {
+ @Override
+ public void apply(ReplicatedLogEntry logEntry) {
+ context.getReplicatedLog().captureSnapshotIfReady(logEntry);
+ }
+ };
+
+ private SnapshotTracker snapshotTracker = null;
+ private String leaderId;
+ private short leaderPayloadVersion;
public Follower(RaftActorContext context) {
+ this(context, null, (short)-1);
+ }
+
+ public Follower(RaftActorContext context, String initialLeaderId, short initialLeaderPayloadVersion) {
super(context, RaftState.Follower);
+ this.leaderId = initialLeaderId;
+ this.leaderPayloadVersion = initialLeaderPayloadVersion;
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
- if(context.getRaftPolicy().automaticElectionsEnabled()) {
- if (context.getPeerAddresses().isEmpty()) {
- actor().tell(ELECTION_TIMEOUT, actor());
+ if(canStartElection()) {
+ if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
+ actor().tell(ElectionTimeout.INSTANCE, actor());
} else {
scheduleElection(electionDuration());
}
}
+ }
+ @Override
+ public final String getLeaderId() {
+ return leaderId;
+ }
+
+ @VisibleForTesting
+ protected final void setLeaderId(final String leaderId) {
+ this.leaderId = Preconditions.checkNotNull(leaderId);
+ }
+
+ @Override
+ public short getLeaderPayloadVersion() {
+ return leaderPayloadVersion;
+ }
+
+ @VisibleForTesting
+ protected final void setLeaderPayloadVersion(short leaderPayloadVersion) {
+ this.leaderPayloadVersion = leaderPayloadVersion;
}
private boolean isLogEntryPresent(long index){
initialSyncStatusTracker.update(leaderId, currentLeaderCommit, context.getCommitIndex());
}
- @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries) {
+ @Override
+ protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) {
int numLogEntries = appendEntries.getEntries() != null ? appendEntries.getEntries().size() : 0;
if(LOG.isTraceEnabled()) {
// to make it easier to read. Before refactoring ensure tests
// cover the code properly
- if (snapshotTracker != null) {
+ if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
// if snapshot install is in progress, follower should just acknowledge append entries with a reply.
AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
lastIndex(), lastTerm(), context.getPayloadVersion());
// If we got here then we do appear to be talking to the leader
leaderId = appendEntries.getLeaderId();
-
- setLeaderPayloadVersion(appendEntries.getPayloadVersion());
+ leaderPayloadVersion = appendEntries.getPayloadVersion();
updateInitialSyncStatus(appendEntries.getLeaderCommit(), appendEntries.getLeaderId());
// First check if the logs are in sync or not
LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
- context.getReplicatedLog().appendAndPersist(entry);
+ context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback);
+
+ if(entry.getData() instanceof ServerConfigurationPayload) {
+ context.updatePeerIds((ServerConfigurationPayload)entry.getData());
+ }
}
LOG.debug("{}: Log size is now {}", logName(), context.getReplicatedLog().size());
logName(), prevLogTerm, appendEntries.getPrevLogTerm());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
&& appendEntries.getReplicatedToAllIndex() != -1
- && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())) {
+ && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())
+ && !context.getReplicatedLog().isInSnapshot(appendEntries.getReplicatedToAllIndex())) {
// This append entry comes from a leader who has it's log aggressively trimmed and so does not have
// the previous entry in it's in-memory journal
"{}: Cannot append entries because the replicatedToAllIndex {} does not appear to be in the in-memory journal",
logName(), appendEntries.getReplicatedToAllIndex());
} else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
- && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0 &&
- !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)){
+ && appendEntries.getReplicatedToAllIndex() != -1 && numLogEntries > 0
+ && !isLogEntryPresent(appendEntries.getEntries().get(0).getIndex() - 1)
+ && !context.getReplicatedLog().isInSnapshot(appendEntries.getEntries().get(0).getIndex() - 1)) {
LOG.debug(
"{}: Cannot append entries because the calculated previousIndex {} was not found in the in-memory journal",
logName(), appendEntries.getEntries().get(0).getIndex() - 1);
return outOfSync;
}
- @Override protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
+ @Override
+ protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
AppendEntriesReply appendEntriesReply) {
return this;
}
- @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
+ @Override
+ protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
return this;
}
- @Override public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
-
- Object message = fromSerializableMessage(originalMessage);
+ @Override
+ public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+ if (originalMessage instanceof ElectionTimeout) {
+ if (canStartElection()) {
+ LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
+ return internalSwitchBehavior(RaftState.Candidate);
+ } else {
+ return this;
+ }
+ }
+ final Object message = fromSerializableMessage(originalMessage);
if (message instanceof RaftRPC) {
RaftRPC rpc = (RaftRPC) message;
// If RPC request or response contains term T > currentTerm:
}
}
- if (message instanceof ElectionTimeout) {
- LOG.debug("{}: Received ElectionTimeout - switching to Candidate", logName());
- return internalSwitchBehavior(RaftState.Candidate);
-
- } else if (message instanceof InstallSnapshot) {
+ if (message instanceof InstallSnapshot) {
InstallSnapshot installSnapshot = (InstallSnapshot) message;
handleInstallSnapshot(sender, installSnapshot);
}
- if(message instanceof RaftRPC && (!(message instanceof RequestVote) || (canGrantVote((RequestVote) message)))){
+ if (message instanceof RaftRPC && (!(message instanceof RequestVote) || (canGrantVote((RequestVote) message)))){
scheduleElection(electionDuration());
}
return super.handleMessage(sender, message);
}
- private void handleInstallSnapshot(ActorRef sender, InstallSnapshot installSnapshot) {
+ private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
- LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
- logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
- installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+ LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
+
+ leaderId = installSnapshot.getLeaderId();
if(snapshotTracker == null){
snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
try {
+ final InstallSnapshotReply reply = new InstallSnapshotReply(
+ currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
+
if(snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
installSnapshot.getLastChunkHashCode())){
Snapshot snapshot = Snapshot.create(snapshotTracker.getSnapshot(),
installSnapshot.getLastIncludedIndex(),
installSnapshot.getLastIncludedTerm(),
installSnapshot.getLastIncludedIndex(),
- installSnapshot.getLastIncludedTerm());
-
- actor().tell(new ApplySnapshot(snapshot), actor());
+ installSnapshot.getLastIncludedTerm(),
+ context.getTermInformation().getCurrentTerm(),
+ context.getTermInformation().getVotedFor(),
+ context.getPeerServerInfo(true));
- snapshotTracker = null;
+ ApplySnapshot.Callback applySnapshotCallback = new ApplySnapshot.Callback() {
+ @Override
+ public void onSuccess() {
+ LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
- }
+ sender.tell(reply, actor());
+ }
- InstallSnapshotReply reply = new InstallSnapshotReply(
- currentTerm(), context.getId(), installSnapshot.getChunkIndex(), true);
+ @Override
+ public void onFailure() {
+ sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), -1, false), actor());
+ }
+ };
- LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
+ actor().tell(new ApplySnapshot(snapshot, applySnapshotCallback), actor());
- sender.tell(reply, actor());
+ snapshotTracker = null;
+ } else {
+ LOG.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
+ sender.tell(reply, actor());
+ }
} catch (SnapshotTracker.InvalidChunkException e) {
LOG.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
}
@Override
- public void close() throws Exception {
+ public void close() {
stopElection();
}