package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
+import akka.japi.Procedure;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
* </ul>
*/
public class Follower extends AbstractRaftActorBehavior {
-
- private SnapshotTracker snapshotTracker = null;
+ private static final int SYNC_THRESHOLD = 10;
private final SyncStatusTracker initialSyncStatusTracker;
- private static final int SYNC_THRESHOLD = 10;
+ private final Procedure<ReplicatedLogEntry> appendAndPersistCallback = new Procedure<ReplicatedLogEntry>() {
+ @Override
+ public void apply(ReplicatedLogEntry logEntry) {
+ context.getReplicatedLog().captureSnapshotIfReady(logEntry);
+ }
+ };
+
+ private SnapshotTracker snapshotTracker = null;
public Follower(RaftActorContext context) {
- this(context, null);
+ this(context, null, (short)-1);
}
- public Follower(RaftActorContext context, String initialLeaderId) {
+ public Follower(RaftActorContext context, String initialLeaderId, short initialLeaderPayloadVersion) {
super(context, RaftState.Follower);
leaderId = initialLeaderId;
+ setLeaderPayloadVersion(initialLeaderPayloadVersion);
initialSyncStatusTracker = new SyncStatusTracker(context.getActor(), getId(), SYNC_THRESHOLD);
if(canStartElection()) {
if (context.getPeerIds().isEmpty() && getLeaderId() == null) {
- actor().tell(ELECTION_TIMEOUT, actor());
+ actor().tell(ElectionTimeout.INSTANCE, actor());
} else {
scheduleElection(electionDuration());
}
LOG.debug("{}: Append entry to log {}", logName(), entry.getData());
- context.getReplicatedLog().appendAndPersist(entry);
+ context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback);
if(entry.getData() instanceof ServerConfigurationPayload) {
context.updatePeerIds((ServerConfigurationPayload)entry.getData());
private void handleInstallSnapshot(final ActorRef sender, InstallSnapshot installSnapshot) {
- LOG.debug("{}: InstallSnapshot received from leader {}, datasize: {} , Chunk: {}/{}",
- logName(), installSnapshot.getLeaderId(), installSnapshot.getData().size(),
- installSnapshot.getChunkIndex(), installSnapshot.getTotalChunks());
+ LOG.debug("{}: handleInstallSnapshot: {}", logName(), installSnapshot);
leaderId = installSnapshot.getLeaderId();