* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
/**
private long lastReplicatedIndex = -1L;
+ private long sentCommitIndex = -1L;
+
private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
private short payloadVersion = -1;
this.nextIndex = context.getCommitIndex();
this.matchIndex = matchIndex;
this.context = context;
- this.peerInfo = Preconditions.checkNotNull(peerInfo);
+ this.peerInfo = requireNonNull(peerInfo);
}
/**
* sending duplicate message too frequently if the last replicate message was sent and no reply has been received
* yet within the current heart beat interval
*
+ * @param commitIndex current commitIndex
* @return true if it is OK to replicate, false otherwise
*/
- public boolean okToReplicate() {
+ public boolean okToReplicate(final long commitIndex) {
if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
return false;
}
- // Return false if we are trying to send duplicate data before the heartbeat interval
- if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
+ // Return false if we are trying to send duplicate data before the heartbeat interval. This check includes
+ // also our commitIndex, as followers need to be told of new commitIndex as soon as possible.
+ if (getNextIndex() == lastReplicatedIndex && !hasStaleCommitIndex(commitIndex)
+ && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
< context.getConfigParams().getHeartBeatInterval().toMillis()) {
return false;
}
*
* @return the LeaderInstallSnapshotState if a snapshot install is in progress, null otherwise.
*/
- @Nullable
- public LeaderInstallSnapshotState getInstallSnapshotState() {
+ public @Nullable LeaderInstallSnapshotState getInstallSnapshotState() {
return installSnapshotState;
}
*
* @param state the LeaderInstallSnapshotState
*/
- public void setLeaderInstallSnapshotState(@Nonnull final LeaderInstallSnapshotState state) {
+ public void setLeaderInstallSnapshotState(final @NonNull LeaderInstallSnapshotState state) {
if (this.installSnapshotState == null) {
- this.installSnapshotState = Preconditions.checkNotNull(state);
+ this.installSnapshotState = requireNonNull(state);
}
}
* Clears the LeaderInstallSnapshotState when an install snapshot is complete.
*/
public void clearLeaderInstallSnapshotState() {
- Preconditions.checkState(installSnapshotState != null);
+ checkState(installSnapshotState != null);
installSnapshotState.close();
installSnapshotState = null;
}
return slicedLogEntryIndex != NO_INDEX;
}
- public void setNeedsLeaderAddress(boolean value) {
+ public void setNeedsLeaderAddress(final boolean value) {
needsLeaderAddress = value;
}
- @Nullable
- public String needsLeaderAddress(String leaderId) {
+ public @Nullable String needsLeaderAddress(final String leaderId) {
return needsLeaderAddress ? context.getPeerAddress(leaderId) : null;
}
+ public boolean hasStaleCommitIndex(final long commitIndex) {
+ return sentCommitIndex != commitIndex;
+ }
+
+ public void setSentCommitIndex(final long commitIndex) {
+ sentCommitIndex = commitIndex;
+ }
+
@Override
public String toString() {
return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
- + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState()
- + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
- + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
+ + ", lastReplicatedIndex=" + lastReplicatedIndex + ", commitIndex=" + sentCommitIndex
+ + ", votingState=" + peerInfo.getVotingState()
+ + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS)
+ + ", followerTimeoutMillis=" + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
}
}