Send commitIndex updates to followers as soon as possible
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / FollowerLogInformation.java
index ca3de5944b6c4c9c732230d495f12dfefd57bd42..92b522c8e8e2a03e4f37e82a032c674685668fb0 100644 (file)
@@ -35,6 +35,8 @@ public final class FollowerLogInformation {
 
     private long lastReplicatedIndex = -1L;
 
+    private long sentCommitIndex = -1L;
+
     private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
 
     private short payloadVersion = -1;
@@ -235,15 +237,18 @@ public final class FollowerLogInformation {
      * sending duplicate message too frequently if the last replicate message was sent and no reply has been received
      * yet within the current heart beat interval
      *
+     * @param commitIndex current commitIndex
      * @return true if it is OK to replicate, false otherwise
      */
-    public boolean okToReplicate() {
+    public boolean okToReplicate(final long commitIndex) {
         if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
             return false;
         }
 
-        // Return false if we are trying to send duplicate data before the heartbeat interval
-        if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
+        // Return false if we are trying to send duplicate data before the heartbeat interval. This check includes
+        // also our commitIndex, as followers need to be told of new commitIndex as soon as possible.
+        if (getNextIndex() == lastReplicatedIndex && !hasStaleCommitIndex(commitIndex)
+                && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
                 < context.getConfigParams().getHeartBeatInterval().toMillis()) {
             return false;
         }
@@ -345,20 +350,29 @@ public final class FollowerLogInformation {
         return slicedLogEntryIndex != NO_INDEX;
     }
 
-    public void setNeedsLeaderAddress(boolean value) {
+    public void setNeedsLeaderAddress(final boolean value) {
         needsLeaderAddress = value;
     }
 
+    public boolean hasStaleCommitIndex(final long commitIndex) {
+        return sentCommitIndex != commitIndex;
+    }
+
+    public void setSentCommitIndex(final long commitIndex) {
+        sentCommitIndex = commitIndex;
+    }
+
     @Nullable
-    public String needsLeaderAddress(String leaderId) {
+    public String needsLeaderAddress(final String leaderId) {
         return needsLeaderAddress ? context.getPeerAddress(leaderId) : null;
     }
 
     @Override
     public String toString() {
         return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
-                + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState()
-                + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
-                + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
+                + ", lastReplicatedIndex=" + lastReplicatedIndex + ", commitIndex=" + sentCommitIndex
+                + ", votingState=" + peerInfo.getVotingState()
+                + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS)
+                + ", followerTimeoutMillis=" + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
     }
 }