X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FFollowerLogInformation.java;h=f5c94fbf4cb26aaf8d61f42f7804b9d660d664fd;hb=3d2551546cd24632de113ad8c61d4d18b6e072ef;hp=9fd2edcb0e1d2dda3ad8e32c2ed6c6cfdca2ee8e;hpb=c983ca95187c03af54867343c8eeb8903e103ea8;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index 9fd2edcb0e..f5c94fbf4c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -5,15 +5,17 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState; /** @@ -35,15 +37,14 @@ public final class FollowerLogInformation { private long lastReplicatedIndex = -1L; + private long sentCommitIndex = -1L; + private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted(); private short payloadVersion = -1; - // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's - // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron - // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is - // HELIUM_VERSION. - private short raftVersion = RaftVersions.HELIUM_VERSION; + // Assume the FLUORINE_VERSION version initially, as we no longer support pre-Fluorine versions. + private short raftVersion = RaftVersions.FLUORINE_VERSION; private final PeerInfo peerInfo; @@ -51,6 +52,8 @@ public final class FollowerLogInformation { private long slicedLogEntryIndex = NO_INDEX; + private boolean needsLeaderAddress; + /** * Constructs an instance. * @@ -60,10 +63,10 @@ public final class FollowerLogInformation { */ @VisibleForTesting FollowerLogInformation(final PeerInfo peerInfo, final long matchIndex, final RaftActorContext context) { - this.nextIndex = context.getCommitIndex(); + nextIndex = context.getCommitIndex(); this.matchIndex = matchIndex; this.context = context; - this.peerInfo = Preconditions.checkNotNull(peerInfo); + this.peerInfo = requireNonNull(peerInfo); } /** @@ -87,16 +90,23 @@ public final class FollowerLogInformation { } /** - * Decrements the value of the follower's next index. + * Decrements the value of the follower's next index, taking into account its reported last log index. * - * @return true if the next index was decremented, ie it was previously >= 0, false otherwise. + * @param followerLastIndex follower's last reported index. + * @return true if the next index was decremented, i.e. it was previously >= 0, false otherwise. */ - public boolean decrNextIndex() { + public boolean decrNextIndex(final long followerLastIndex) { if (nextIndex < 0) { return false; } - nextIndex--; + if (followerLastIndex >= 0 && nextIndex > followerLastIndex) { + // If the follower's last log index is lower than nextIndex, jump directly to it, so we converge + // on a common index more quickly. + nextIndex = followerLastIndex; + } else { + nextIndex--; + } return true; } @@ -215,10 +225,10 @@ public final class FollowerLogInformation { /** * Returns the time since the last activity occurred for the follower. * - * @return time in milliseconds since the last activity from the follower. + * @return time in nanoseconds since the last activity from the follower. */ - public long timeSinceLastActivity() { - return stopwatch.elapsed(TimeUnit.MILLISECONDS); + public long nanosSinceLastActivity() { + return stopwatch.elapsed(TimeUnit.NANOSECONDS); } /** @@ -226,15 +236,18 @@ public final class FollowerLogInformation { * sending duplicate message too frequently if the last replicate message was sent and no reply has been received * yet within the current heart beat interval * + * @param commitIndex current commitIndex * @return true if it is OK to replicate, false otherwise */ - public boolean okToReplicate() { + public boolean okToReplicate(final long commitIndex) { if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { return false; } - // Return false if we are trying to send duplicate data before the heartbeat interval - if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) + // Return false if we are trying to send duplicate data before the heartbeat interval. This check includes + // also our commitIndex, as followers need to be told of new commitIndex as soon as possible. + if (getNextIndex() == lastReplicatedIndex && !hasStaleCommitIndex(commitIndex) + && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams().getHeartBeatInterval().toMillis()) { return false; } @@ -284,6 +297,7 @@ public final class FollowerLogInformation { * @param raftVersion the raft version. */ public void setRaftVersion(final short raftVersion) { + checkArgument(raftVersion >= RaftVersions.FLUORINE_VERSION, "Unexpected version %s", raftVersion); this.raftVersion = raftVersion; } @@ -292,8 +306,7 @@ public final class FollowerLogInformation { * * @return the LeaderInstallSnapshotState if a snapshot install is in progress, null otherwise. */ - @Nullable - public LeaderInstallSnapshotState getInstallSnapshotState() { + public @Nullable LeaderInstallSnapshotState getInstallSnapshotState() { return installSnapshotState; } @@ -302,9 +315,9 @@ public final class FollowerLogInformation { * * @param state the LeaderInstallSnapshotState */ - public void setLeaderInstallSnapshotState(@Nonnull final LeaderInstallSnapshotState state) { - if (this.installSnapshotState == null) { - this.installSnapshotState = Preconditions.checkNotNull(state); + public void setLeaderInstallSnapshotState(final @NonNull LeaderInstallSnapshotState state) { + if (installSnapshotState == null) { + installSnapshotState = requireNonNull(state); } } @@ -312,7 +325,7 @@ public final class FollowerLogInformation { * Clears the LeaderInstallSnapshotState when an install snapshot is complete. */ public void clearLeaderInstallSnapshotState() { - Preconditions.checkState(installSnapshotState != null); + checkState(installSnapshotState != null); installSnapshotState.close(); installSnapshotState = null; } @@ -336,11 +349,28 @@ public final class FollowerLogInformation { return slicedLogEntryIndex != NO_INDEX; } + public void setNeedsLeaderAddress(final boolean value) { + needsLeaderAddress = value; + } + + public @Nullable String needsLeaderAddress(final String leaderId) { + return needsLeaderAddress ? context.getPeerAddress(leaderId) : null; + } + + public boolean hasStaleCommitIndex(final long commitIndex) { + return sentCommitIndex != commitIndex; + } + + public void setSentCommitIndex(final long commitIndex) { + sentCommitIndex = commitIndex; + } + @Override public String toString() { return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex - + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState() - + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis=" - + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; + + ", lastReplicatedIndex=" + lastReplicatedIndex + ", commitIndex=" + sentCommitIndex + + ", votingState=" + peerInfo.getVotingState() + + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + + ", followerTimeoutMillis=" + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; } }