X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FFollowerLogInformation.java;h=f5c94fbf4cb26aaf8d61f42f7804b9d660d664fd;hb=HEAD;hp=a5f24990f6a54b4542bcc67dee35f2cdd4c56467;hpb=b4bf55727093657662d8c16a50fa85f87978a586;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index a5f24990f6..f5c94fbf4c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.raft; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; @@ -36,15 +37,14 @@ public final class FollowerLogInformation { private long lastReplicatedIndex = -1L; + private long sentCommitIndex = -1L; + private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted(); private short payloadVersion = -1; - // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's - // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron - // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is - // HELIUM_VERSION. - private short raftVersion = RaftVersions.HELIUM_VERSION; + // Assume the FLUORINE_VERSION version initially, as we no longer support pre-Fluorine versions. + private short raftVersion = RaftVersions.FLUORINE_VERSION; private final PeerInfo peerInfo; @@ -63,7 +63,7 @@ public final class FollowerLogInformation { */ @VisibleForTesting FollowerLogInformation(final PeerInfo peerInfo, final long matchIndex, final RaftActorContext context) { - this.nextIndex = context.getCommitIndex(); + nextIndex = context.getCommitIndex(); this.matchIndex = matchIndex; this.context = context; this.peerInfo = requireNonNull(peerInfo); @@ -236,15 +236,18 @@ public final class FollowerLogInformation { * sending duplicate message too frequently if the last replicate message was sent and no reply has been received * yet within the current heart beat interval * + * @param commitIndex current commitIndex * @return true if it is OK to replicate, false otherwise */ - public boolean okToReplicate() { + public boolean okToReplicate(final long commitIndex) { if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) { return false; } - // Return false if we are trying to send duplicate data before the heartbeat interval - if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) + // Return false if we are trying to send duplicate data before the heartbeat interval. This check includes + // also our commitIndex, as followers need to be told of new commitIndex as soon as possible. + if (getNextIndex() == lastReplicatedIndex && !hasStaleCommitIndex(commitIndex) + && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS) < context.getConfigParams().getHeartBeatInterval().toMillis()) { return false; } @@ -294,6 +297,7 @@ public final class FollowerLogInformation { * @param raftVersion the raft version. */ public void setRaftVersion(final short raftVersion) { + checkArgument(raftVersion >= RaftVersions.FLUORINE_VERSION, "Unexpected version %s", raftVersion); this.raftVersion = raftVersion; } @@ -312,8 +316,8 @@ public final class FollowerLogInformation { * @param state the LeaderInstallSnapshotState */ public void setLeaderInstallSnapshotState(final @NonNull LeaderInstallSnapshotState state) { - if (this.installSnapshotState == null) { - this.installSnapshotState = requireNonNull(state); + if (installSnapshotState == null) { + installSnapshotState = requireNonNull(state); } } @@ -345,19 +349,28 @@ public final class FollowerLogInformation { return slicedLogEntryIndex != NO_INDEX; } - public void setNeedsLeaderAddress(boolean value) { + public void setNeedsLeaderAddress(final boolean value) { needsLeaderAddress = value; } - public @Nullable String needsLeaderAddress(String leaderId) { + public @Nullable String needsLeaderAddress(final String leaderId) { return needsLeaderAddress ? context.getPeerAddress(leaderId) : null; } + public boolean hasStaleCommitIndex(final long commitIndex) { + return sentCommitIndex != commitIndex; + } + + public void setSentCommitIndex(final long commitIndex) { + sentCommitIndex = commitIndex; + } + @Override public String toString() { return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex - + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState() - + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis=" - + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; + + ", lastReplicatedIndex=" + lastReplicatedIndex + ", commitIndex=" + sentCommitIndex + + ", votingState=" + peerInfo.getVotingState() + + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + + ", followerTimeoutMillis=" + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]"; } }