X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FSyncStatusTracker.java;h=2b700ffc43c1ec80cce36f2ab4293cfaebc09547;hb=9b319f491af1c65705b69e8a182aab5006a2f959;hp=0986565bac2748b94037a7a19f3f4c6a2912d651;hpb=660c3e22ca97bc613ea6f6288503620bba6fb233;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTracker.java index 0986565bac..2b700ffc43 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTracker.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SyncStatusTracker.java @@ -5,12 +5,15 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.behaviors; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; -import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The SyncStatusTracker tracks if a Follower is in sync with any given Leader or not @@ -21,51 +24,63 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn * sync if it is behind by 'syncThreshold' commits. */ public class SyncStatusTracker { + private static final class LeaderInfo { + final long minimumCommitIndex; + final String leaderId; + + LeaderInfo(final String leaderId, final long minimumCommitIndex) { + this.leaderId = requireNonNull(leaderId); + this.minimumCommitIndex = minimumCommitIndex; + } + } + + private static final Logger LOG = LoggerFactory.getLogger(SyncStatusTracker.class); private static final boolean IN_SYNC = true; private static final boolean NOT_IN_SYNC = false; - private static final boolean FORCE_STATUS_CHANGE = true; - private final String id; - private String syncedLeaderId = null; + private final long syncThreshold; private final ActorRef actor; - private final int syncThreshold; - private boolean syncStatus = false; - private long minimumExpectedIndex = -2L; + private final String id; + + private LeaderInfo syncTarget; + private boolean syncStatus; - public SyncStatusTracker(ActorRef actor, String id, int syncThreshold) { - this.actor = Preconditions.checkNotNull(actor, "actor should not be null"); - this.id = Preconditions.checkNotNull(id, "id should not be null"); - Preconditions.checkArgument(syncThreshold >= 0, "syncThreshold should be greater than or equal to 0"); + public SyncStatusTracker(final ActorRef actor, final String id, final long syncThreshold) { + this.actor = requireNonNull(actor, "actor should not be null"); + this.id = requireNonNull(id, "id should not be null"); + checkArgument(syncThreshold >= 0, "syncThreshold should be greater than or equal to 0"); this.syncThreshold = syncThreshold; } - public void update(String leaderId, long leaderCommit, long commitIndex) { - Preconditions.checkNotNull(leaderId, "leaderId should not be null"); + public void update(final String leaderId, final long leaderCommit, final long commitIndex) { + requireNonNull(leaderId, "leaderId should not be null"); - if (!leaderId.equals(syncedLeaderId)) { - minimumExpectedIndex = leaderCommit; - changeSyncStatus(NOT_IN_SYNC, FORCE_STATUS_CHANGE); - syncedLeaderId = leaderId; + if (syncTarget == null || !leaderId.equals(syncTarget.leaderId)) { + LOG.debug("{}: Last sync leader does not match current leader {}, need to catch up to {}", id, + leaderId, leaderCommit); + changeSyncStatus(NOT_IN_SYNC, true); + syncTarget = new LeaderInfo(leaderId, leaderCommit); return; } - if (leaderCommit - commitIndex > syncThreshold) { - changeSyncStatus(NOT_IN_SYNC); - } else if (leaderCommit - commitIndex <= syncThreshold && commitIndex >= minimumExpectedIndex) { - changeSyncStatus(IN_SYNC); + final long lag = leaderCommit - commitIndex; + if (lag > syncThreshold) { + LOG.debug("{}: Lagging {} entries behind leader {}", id, lag, leaderId); + changeSyncStatus(NOT_IN_SYNC, false); + } else if (commitIndex >= syncTarget.minimumCommitIndex) { + LOG.debug("{}: Lagging {} entries behind leader {} and reached {} (of expected {})", id, lag, leaderId, + commitIndex, syncTarget.minimumCommitIndex); + changeSyncStatus(IN_SYNC, false); } } - private void changeSyncStatus(boolean newSyncStatus) { - changeSyncStatus(newSyncStatus, !FORCE_STATUS_CHANGE); - } - - private void changeSyncStatus(boolean newSyncStatus, boolean forceStatusChange) { - if (syncStatus == newSyncStatus && !forceStatusChange) { - return; + private void changeSyncStatus(final boolean newSyncStatus, final boolean forceStatusChange) { + if (forceStatusChange || newSyncStatus != syncStatus) { + actor.tell(new FollowerInitialSyncUpStatus(newSyncStatus, id), ActorRef.noSender()); + syncStatus = newSyncStatus; + } else { + LOG.trace("{}: No change in sync status of, dampening message", id); } - actor.tell(new FollowerInitialSyncUpStatus(newSyncStatus, id), ActorRef.noSender()); - syncStatus = newSyncStatus; } }