private long lastReplicatedIndex = -1L;
+ private long sentCommitIndex = -1L;
+
private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
private short payloadVersion = -1;
* sending duplicate message too frequently if the last replicate message was sent and no reply has been received
* yet within the current heart beat interval
*
+ * @param commitIndex current commitIndex
* @return true if it is OK to replicate, false otherwise
*/
- public boolean okToReplicate() {
+ public boolean okToReplicate(final long commitIndex) {
if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
return false;
}
- // Return false if we are trying to send duplicate data before the heartbeat interval
- if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
+ // Return false if we are trying to send duplicate data before the heartbeat interval. This check includes
+ // also our commitIndex, as followers need to be told of new commitIndex as soon as possible.
+ if (getNextIndex() == lastReplicatedIndex && !hasStaleCommitIndex(commitIndex)
+ && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
< context.getConfigParams().getHeartBeatInterval().toMillis()) {
return false;
}
return slicedLogEntryIndex != NO_INDEX;
}
- public void setNeedsLeaderAddress(boolean value) {
+ public void setNeedsLeaderAddress(final boolean value) {
needsLeaderAddress = value;
}
- public @Nullable String needsLeaderAddress(String leaderId) {
+ public @Nullable String needsLeaderAddress(final String leaderId) {
return needsLeaderAddress ? context.getPeerAddress(leaderId) : null;
}
+ public boolean hasStaleCommitIndex(final long commitIndex) {
+ return sentCommitIndex != commitIndex;
+ }
+
+ public void setSentCommitIndex(final long commitIndex) {
+ sentCommitIndex = commitIndex;
+ }
+
@Override
public String toString() {
return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
- + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState()
- + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
- + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
+ + ", lastReplicatedIndex=" + lastReplicatedIndex + ", commitIndex=" + sentCommitIndex
+ + ", votingState=" + peerInfo.getVotingState()
+ + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS)
+ + ", followerTimeoutMillis=" + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
}
}