import akka.actor.ActorRef;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The SyncStatusTracker tracks if a Follower is in sync with any given Leader or not
* sync if it is behind by 'syncThreshold' commits.
*/
public class SyncStatusTracker {
+ private static final Logger LOG = LoggerFactory.getLogger(SyncStatusTracker.class);
private static final boolean IN_SYNC = true;
private static final boolean NOT_IN_SYNC = false;
- private static final boolean FORCE_STATUS_CHANGE = true;
private final String id;
- private String syncedLeaderId = null;
private final ActorRef actor;
private final int syncThreshold;
- private boolean syncStatus = false;
+
+ // FIXME: what is this magic constant?
private long minimumExpectedIndex = -2L;
+ private String syncedLeaderId = null;
+ private boolean syncStatus = false;
- public SyncStatusTracker(ActorRef actor, String id, int syncThreshold) {
+ public SyncStatusTracker(final ActorRef actor, final String id, final int syncThreshold) {
this.actor = Preconditions.checkNotNull(actor, "actor should not be null");
this.id = Preconditions.checkNotNull(id, "id should not be null");
Preconditions.checkArgument(syncThreshold >= 0, "syncThreshold should be greater than or equal to 0");
this.syncThreshold = syncThreshold;
}
- public void update(String leaderId, long leaderCommit, long commitIndex) {
+ public void update(final String leaderId, final long leaderCommit, final long commitIndex) {
Preconditions.checkNotNull(leaderId, "leaderId should not be null");
if (!leaderId.equals(syncedLeaderId)) {
minimumExpectedIndex = leaderCommit;
- changeSyncStatus(NOT_IN_SYNC, FORCE_STATUS_CHANGE);
+ LOG.debug("Last sync leader {} does not match current leader {}, need to catch up to {}", syncedLeaderId,
+ leaderId, leaderCommit);
+ changeSyncStatus(NOT_IN_SYNC, true);
syncedLeaderId = leaderId;
return;
}
- if (leaderCommit - commitIndex > syncThreshold) {
- changeSyncStatus(NOT_IN_SYNC);
- } else if (leaderCommit - commitIndex <= syncThreshold && commitIndex >= minimumExpectedIndex) {
- changeSyncStatus(IN_SYNC);
+ final long lag = leaderCommit - commitIndex;
+ if (lag > syncThreshold) {
+ LOG.debug("Lagging {} entries behind leader {}", lag, leaderId);
+ changeSyncStatus(NOT_IN_SYNC, false);
+ } else if (commitIndex >= minimumExpectedIndex) {
+ LOG.debug("Lagging {} entries behind leader and reached {} (of expected {})", lag, leaderId, commitIndex,
+ minimumExpectedIndex);
+ changeSyncStatus(IN_SYNC, false);
}
}
- private void changeSyncStatus(boolean newSyncStatus) {
- changeSyncStatus(newSyncStatus, !FORCE_STATUS_CHANGE);
- }
-
- private void changeSyncStatus(boolean newSyncStatus, boolean forceStatusChange) {
- if (syncStatus == newSyncStatus && !forceStatusChange) {
- return;
+ private void changeSyncStatus(final boolean newSyncStatus, final boolean forceStatusChange) {
+ if (forceStatusChange || newSyncStatus != syncStatus) {
+ actor.tell(new FollowerInitialSyncUpStatus(newSyncStatus, id), ActorRef.noSender());
+ syncStatus = newSyncStatus;
+ } else {
+ LOG.trace("No change in sync status of {}, dampening message", actor);
}
- actor.tell(new FollowerInitialSyncUpStatus(newSyncStatus, id), ActorRef.noSender());
- syncStatus = newSyncStatus;
}
}