X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FFollowerLogInformation.java;h=a76d6a29c272db22c34ff66c23769384305f19e9;hb=615798c6573f1689068d6da14963112174c0702a;hp=3952b386b2498a6e2fecf56d6f5a1cb3f4dda8fc;hpb=1b1360ac337d23b9a586f62616eb278c3065eef0;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index 3952b386b2..a76d6a29c2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -5,15 +5,16 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState; /** @@ -35,6 +36,8 @@ public final class FollowerLogInformation { private long lastReplicatedIndex = -1L; + private long sentCommitIndex = -1L; + private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted(); private short payloadVersion = -1; @@ -51,6 +54,8 @@ public final class FollowerLogInformation { private long slicedLogEntryIndex = NO_INDEX; + private boolean needsLeaderAddress; + /** * Constructs an instance. * @@ -63,7 +68,7 @@ public final class FollowerLogInformation { this.nextIndex = context.getCommitIndex(); this.matchIndex = matchIndex; this.context = context; - this.peerInfo = Preconditions.checkNotNull(peerInfo); + this.peerInfo = requireNonNull(peerInfo); } /** @@ -87,16 +92,23 @@ public final class FollowerLogInformation { } /** - * Decrements the value of the follower's next index. + * Decrements the value of the follower's next index, taking into account its reported last log index. * - * @return true if the next index was decremented, ie it was previously >= 0, false otherwise. + * @param followerLastIndex follower's last reported index. + * @return true if the next index was decremented, i.e. it was previously >= 0, false otherwise. */ - public boolean decrNextIndex() { + public boolean decrNextIndex(final long followerLastIndex) { if (nextIndex < 0) { return false; } - nextIndex--; + if (followerLastIndex >= 0 && nextIndex > followerLastIndex) { + // If the follower's last log index is lower than nextIndex, jump directly to it, so we converge + // on a common index more quickly. + nextIndex = followerLastIndex; + } else { + nextIndex--; + } return true; } @@ -226,15 +238,18 @@ public final class FollowerLogInformation { * sending duplicate message too frequently if the last replicate message was sent and no reply has been received * yet within the current heart beat interval * + * @param commitIndex current commitIndex * @return true if it is OK to replicate, false otherwise */ - public boolean okToReplicate() { + public boolean okToReplicate(final long commitIndex) { if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { return false; } - // Return false if we are trying to send duplicate data before the heartbeat interval - if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) + // Return false if we are trying to send duplicate data before the heartbeat interval. This check includes + // also our commitIndex, as followers need to be told of new commitIndex as soon as possible. + if (getNextIndex() == lastReplicatedIndex && !hasStaleCommitIndex(commitIndex) + && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams().getHeartBeatInterval().toMillis()) { return false; } @@ -292,8 +307,7 @@ public final class FollowerLogInformation { * * @return the LeaderInstallSnapshotState if a snapshot install is in progress, null otherwise. */ - @Nullable - public LeaderInstallSnapshotState getInstallSnapshotState() { + public @Nullable LeaderInstallSnapshotState getInstallSnapshotState() { return installSnapshotState; } @@ -302,9 +316,9 @@ public final class FollowerLogInformation { * * @param state the LeaderInstallSnapshotState */ - public void setLeaderInstallSnapshotState(@Nonnull final LeaderInstallSnapshotState state) { + public void setLeaderInstallSnapshotState(final @NonNull LeaderInstallSnapshotState state) { if (this.installSnapshotState == null) { - this.installSnapshotState = Preconditions.checkNotNull(state); + this.installSnapshotState = requireNonNull(state); } } @@ -312,7 +326,7 @@ public final class FollowerLogInformation { * Clears the LeaderInstallSnapshotState when an install snapshot is complete. */ public void clearLeaderInstallSnapshotState() { - Preconditions.checkState(installSnapshotState != null); + checkState(installSnapshotState != null); installSnapshotState.close(); installSnapshotState = null; } @@ -336,11 +350,28 @@ public final class FollowerLogInformation { return slicedLogEntryIndex != NO_INDEX; } + public void setNeedsLeaderAddress(final boolean value) { + needsLeaderAddress = value; + } + + public @Nullable String needsLeaderAddress(final String leaderId) { + return needsLeaderAddress ? context.getPeerAddress(leaderId) : null; + } + + public boolean hasStaleCommitIndex(final long commitIndex) { + return sentCommitIndex != commitIndex; + } + + public void setSentCommitIndex(final long commitIndex) { + sentCommitIndex = commitIndex; + } + @Override public String toString() { return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex - + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState() - + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis=" - + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; + + ", lastReplicatedIndex=" + lastReplicatedIndex + ", commitIndex=" + sentCommitIndex + + ", votingState=" + peerInfo.getVotingState() + + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + + ", followerTimeoutMillis=" + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; } }