* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
/**
private long lastReplicatedIndex = -1L;
+ private long sentCommitIndex = -1L;
+
private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
private short payloadVersion = -1;
- // Assume the HELIUM_VERSION version initially for backwards compatibility until we obtain the follower's
- // actual version via AppendEntriesReply. Although we no longer support the Helium version, a pre-Boron
- // follower will not have the version field in AppendEntriesReply so it will be set to 0 which is
- // HELIUM_VERSION.
- private short raftVersion = RaftVersions.HELIUM_VERSION;
+ // Assume the FLUORINE_VERSION version initially, as we no longer support pre-Fluorine versions.
+ private short raftVersion = RaftVersions.FLUORINE_VERSION;
private final PeerInfo peerInfo;
private long slicedLogEntryIndex = NO_INDEX;
+ private boolean needsLeaderAddress;
+
/**
* Constructs an instance.
*
*/
@VisibleForTesting
FollowerLogInformation(final PeerInfo peerInfo, final long matchIndex, final RaftActorContext context) {
- this.nextIndex = context.getCommitIndex();
+ nextIndex = context.getCommitIndex();
this.matchIndex = matchIndex;
this.context = context;
- this.peerInfo = Preconditions.checkNotNull(peerInfo);
+ this.peerInfo = requireNonNull(peerInfo);
}
/**
}
/**
- * Decrements the value of the follower's next index.
+ * Decrements the value of the follower's next index, taking into account its reported last log index.
*
- * @return true if the next index was decremented, ie it was previously >= 0, false otherwise.
+ * @param followerLastIndex follower's last reported index.
+ * @return true if the next index was decremented, i.e. it was previously >= 0, false otherwise.
*/
- public boolean decrNextIndex() {
+ public boolean decrNextIndex(final long followerLastIndex) {
if (nextIndex < 0) {
return false;
}
- nextIndex--;
+ if (followerLastIndex >= 0 && nextIndex > followerLastIndex) {
+ // If the follower's last log index is lower than nextIndex, jump directly to it, so we converge
+ // on a common index more quickly.
+ nextIndex = followerLastIndex;
+ } else {
+ nextIndex--;
+ }
return true;
}
/**
* Returns the time since the last activity occurred for the follower.
*
- * @return time in milliseconds since the last activity from the follower.
+ * @return time in nanoseconds since the last activity from the follower.
*/
- public long timeSinceLastActivity() {
- return stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ public long nanosSinceLastActivity() {
+ return stopwatch.elapsed(TimeUnit.NANOSECONDS);
}
/**
* sending duplicate message too frequently if the last replicate message was sent and no reply has been received
* yet within the current heart beat interval
*
+ * @param commitIndex current commitIndex
* @return true if it is OK to replicate, false otherwise
*/
- public boolean okToReplicate() {
+ public boolean okToReplicate(final long commitIndex) {
if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
return false;
}
- // Return false if we are trying to send duplicate data before the heartbeat interval
- if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
+ // Return false if we are trying to send duplicate data before the heartbeat interval. This check includes
+ // also our commitIndex, as followers need to be told of new commitIndex as soon as possible.
+ if (getNextIndex() == lastReplicatedIndex && !hasStaleCommitIndex(commitIndex)
+ && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
< context.getConfigParams().getHeartBeatInterval().toMillis()) {
return false;
}
* @param raftVersion the raft version.
*/
public void setRaftVersion(final short raftVersion) {
+ checkArgument(raftVersion >= RaftVersions.FLUORINE_VERSION, "Unexpected version %s", raftVersion);
this.raftVersion = raftVersion;
}
*
* @return the LeaderInstallSnapshotState if a snapshot install is in progress, null otherwise.
*/
- @Nullable
- public LeaderInstallSnapshotState getInstallSnapshotState() {
+ public @Nullable LeaderInstallSnapshotState getInstallSnapshotState() {
return installSnapshotState;
}
*
* @param state the LeaderInstallSnapshotState
*/
- public void setLeaderInstallSnapshotState(@Nonnull final LeaderInstallSnapshotState state) {
- if (this.installSnapshotState == null) {
- this.installSnapshotState = Preconditions.checkNotNull(state);
+ public void setLeaderInstallSnapshotState(final @NonNull LeaderInstallSnapshotState state) {
+ if (installSnapshotState == null) {
+ installSnapshotState = requireNonNull(state);
}
}
* Clears the LeaderInstallSnapshotState when an install snapshot is complete.
*/
public void clearLeaderInstallSnapshotState() {
- Preconditions.checkState(installSnapshotState != null);
+ checkState(installSnapshotState != null);
installSnapshotState.close();
installSnapshotState = null;
}
return slicedLogEntryIndex != NO_INDEX;
}
+ public void setNeedsLeaderAddress(final boolean value) {
+ needsLeaderAddress = value;
+ }
+
+ public @Nullable String needsLeaderAddress(final String leaderId) {
+ return needsLeaderAddress ? context.getPeerAddress(leaderId) : null;
+ }
+
+ public boolean hasStaleCommitIndex(final long commitIndex) {
+ return sentCommitIndex != commitIndex;
+ }
+
+ public void setSentCommitIndex(final long commitIndex) {
+ sentCommitIndex = commitIndex;
+ }
+
@Override
public String toString() {
return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
- + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState()
- + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
- + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
+ + ", lastReplicatedIndex=" + lastReplicatedIndex + ", commitIndex=" + sentCommitIndex
+ + ", votingState=" + peerInfo.getVotingState()
+ + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS)
+ + ", followerTimeoutMillis=" + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
}
}