Send commitIndex updates to followers as soon as possible
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / FollowerLogInformation.java
index 3952b386b2498a6e2fecf56d6f5a1cb3f4dda8fc..a76d6a29c272db22c34ff66c23769384305f19e9 100644 (file)
@@ -5,15 +5,16 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.raft;
 
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
 
 /**
@@ -35,6 +36,8 @@ public final class FollowerLogInformation {
 
     private long lastReplicatedIndex = -1L;
 
+    private long sentCommitIndex = -1L;
+
     private final Stopwatch lastReplicatedStopwatch = Stopwatch.createUnstarted();
 
     private short payloadVersion = -1;
@@ -51,6 +54,8 @@ public final class FollowerLogInformation {
 
     private long slicedLogEntryIndex = NO_INDEX;
 
+    private boolean needsLeaderAddress;
+
     /**
      * Constructs an instance.
      *
@@ -63,7 +68,7 @@ public final class FollowerLogInformation {
         this.nextIndex = context.getCommitIndex();
         this.matchIndex = matchIndex;
         this.context = context;
-        this.peerInfo = Preconditions.checkNotNull(peerInfo);
+        this.peerInfo = requireNonNull(peerInfo);
     }
 
     /**
@@ -87,16 +92,23 @@ public final class FollowerLogInformation {
     }
 
     /**
-     * Decrements the value of the follower's next index.
+     * Decrements the value of the follower's next index, taking into account its reported last log index.
      *
-     * @return true if the next index was decremented, ie it was previously >= 0, false otherwise.
+     * @param followerLastIndex follower's last reported index.
+     * @return true if the next index was decremented, i.e. it was previously >= 0, false otherwise.
      */
-    public boolean decrNextIndex() {
+    public boolean decrNextIndex(final long followerLastIndex) {
         if (nextIndex < 0) {
             return false;
         }
 
-        nextIndex--;
+        if (followerLastIndex >= 0 && nextIndex > followerLastIndex) {
+            // If the follower's last log index is lower than nextIndex, jump directly to it, so we converge
+            // on a common index more quickly.
+            nextIndex = followerLastIndex;
+        } else {
+            nextIndex--;
+        }
         return true;
     }
 
@@ -226,15 +238,18 @@ public final class FollowerLogInformation {
      * sending duplicate message too frequently if the last replicate message was sent and no reply has been received
      * yet within the current heart beat interval
      *
+     * @param commitIndex current commitIndex
      * @return true if it is OK to replicate, false otherwise
      */
-    public boolean okToReplicate() {
+    public boolean okToReplicate(final long commitIndex) {
         if (peerInfo.getVotingState() == VotingState.VOTING_NOT_INITIALIZED) {
             return false;
         }
 
-        // Return false if we are trying to send duplicate data before the heartbeat interval
-        if (getNextIndex() == lastReplicatedIndex && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
+        // Return false if we are trying to send duplicate data before the heartbeat interval. This check includes
+        // also our commitIndex, as followers need to be told of new commitIndex as soon as possible.
+        if (getNextIndex() == lastReplicatedIndex && !hasStaleCommitIndex(commitIndex)
+                && lastReplicatedStopwatch.elapsed(TimeUnit.MILLISECONDS)
                 < context.getConfigParams().getHeartBeatInterval().toMillis()) {
             return false;
         }
@@ -292,8 +307,7 @@ public final class FollowerLogInformation {
      *
      * @return the LeaderInstallSnapshotState if a snapshot install is in progress, null otherwise.
      */
-    @Nullable
-    public LeaderInstallSnapshotState getInstallSnapshotState() {
+    public @Nullable LeaderInstallSnapshotState getInstallSnapshotState() {
         return installSnapshotState;
     }
 
@@ -302,9 +316,9 @@ public final class FollowerLogInformation {
      *
      * @param state the LeaderInstallSnapshotState
      */
-    public void setLeaderInstallSnapshotState(@Nonnull final LeaderInstallSnapshotState state) {
+    public void setLeaderInstallSnapshotState(final @NonNull LeaderInstallSnapshotState state) {
         if (this.installSnapshotState == null) {
-            this.installSnapshotState = Preconditions.checkNotNull(state);
+            this.installSnapshotState = requireNonNull(state);
         }
     }
 
@@ -312,7 +326,7 @@ public final class FollowerLogInformation {
      * Clears the LeaderInstallSnapshotState when an install snapshot is complete.
      */
     public void clearLeaderInstallSnapshotState() {
-        Preconditions.checkState(installSnapshotState != null);
+        checkState(installSnapshotState != null);
         installSnapshotState.close();
         installSnapshotState = null;
     }
@@ -336,11 +350,28 @@ public final class FollowerLogInformation {
         return slicedLogEntryIndex != NO_INDEX;
     }
 
+    public void setNeedsLeaderAddress(final boolean value) {
+        needsLeaderAddress = value;
+    }
+
+    public @Nullable String needsLeaderAddress(final String leaderId) {
+        return needsLeaderAddress ? context.getPeerAddress(leaderId) : null;
+    }
+
+    public boolean hasStaleCommitIndex(final long commitIndex) {
+        return sentCommitIndex != commitIndex;
+    }
+
+    public void setSentCommitIndex(final long commitIndex) {
+        sentCommitIndex = commitIndex;
+    }
+
     @Override
     public String toString() {
         return "FollowerLogInformation [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
-                + ", lastReplicatedIndex=" + lastReplicatedIndex + ", votingState=" + peerInfo.getVotingState()
-                + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ", followerTimeoutMillis="
-                + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
+                + ", lastReplicatedIndex=" + lastReplicatedIndex + ", commitIndex=" + sentCommitIndex
+                + ", votingState=" + peerInfo.getVotingState()
+                + ", stopwatch=" + stopwatch.elapsed(TimeUnit.MILLISECONDS)
+                + ", followerTimeoutMillis=" + context.getConfigParams().getElectionTimeOutInterval().toMillis() + "]";
     }
 }