Fix eclipse warnings
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index fab1714989a99407f3d51452d33ef313ac2c14e8..10c1a156f9cbbdb528076ad70fa6ecbff48052dc 100644 (file)
@@ -35,6 +35,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -50,7 +51,8 @@ import scala.concurrent.duration.FiniteDuration;
 
 /**
  * The behavior of a RaftActor when it is in the Leader state
- * <p/>
+ *
+ * <p>
  * Leaders:
  * <ul>
  * <li> Upon election: send initial empty AppendEntries RPCs
@@ -64,7 +66,7 @@ import scala.concurrent.duration.FiniteDuration;
  * follower (§5.3)
  * <li> If AppendEntries fails because of log inconsistency:
  * decrement nextIndex and retry (§5.3)
- * <li> If there exists an N such that N > commitIndex, a majority
+ * <li> If there exists an N such that N &gt; commitIndex, a majority
  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
  * set commitIndex = N (§5.3, §5.4).
  * </ul>
@@ -290,17 +292,42 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
         }
 
-        // Now figure out if this reply warrants a change in the commitIndex
-        // If there exists an N such that N > commitIndex, a majority
-        // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
-        // set commitIndex = N (§5.3, §5.4).
         if (log.isTraceEnabled()) {
             log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
                     logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
         }
 
+        possiblyUpdateCommitIndex();
+
+        //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
+        sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
+
+        return this;
+    }
+
+    private void possiblyUpdateCommitIndex() {
+        // Figure out if we can update the the commitIndex as follows:
+        //   If there exists an index N such that N > commitIndex, a majority of matchIndex[i] ≥ N,
+        //     and log[N].term == currentTerm:
+        //   set commitIndex = N (§5.3, §5.4).
         for (long index = context.getCommitIndex() + 1; ; index++) {
-            int replicatedCount = 1;
+            ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
+            if (replicatedLogEntry == null) {
+                log.trace("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
+                        logName(), index, context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().size());
+                break;
+            }
+
+            // Count our entry if it has been persisted.
+            int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1;
+
+            if (replicatedCount == 0) {
+                // We don't commit and apply a log entry until we've gotten the ack from our local persistence,
+                // even though there *shouldn't* be any issue with updating the commit index if we get a consensus
+                // amongst the followers w/o the local persistence ack.
+                break;
+            }
 
             log.trace("{}: checking Nth index {}", logName(), index);
             for (FollowerLogInformation info : followerToLog.values()) {
@@ -319,14 +346,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
 
             if (replicatedCount >= minReplicationCount) {
-                ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
-                if (replicatedLogEntry == null) {
-                    log.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
-                            logName(), index, context.getReplicatedLog().getSnapshotIndex(),
-                            context.getReplicatedLog().size());
-                    break;
-                }
-
                 // Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
                 // "Raft never commits log entries from previous terms by counting replicas".
                 // However we keep looping so we can make progress when new entries in the current term
@@ -348,11 +367,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         // Apply the change to the state machine
         if (context.getCommitIndex() > context.getLastApplied()) {
-            if (log.isDebugEnabled()) {
-                log.debug(
-                    "{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}",
-                    logName(), followerId, context.getCommitIndex(), context.getLastApplied());
-            }
+            log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName(),
+                    context.getCommitIndex(), context.getLastApplied());
 
             applyLogToStateMachine(context.getCommitIndex());
         }
@@ -360,11 +376,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         if (!context.getSnapshotManager().isCapturing()) {
             purgeInMemoryLog();
         }
-
-        //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
-        sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
-
-        return this;
     }
 
     private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation,
@@ -447,6 +458,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             replicate((Replicate) message);
         } else if (message instanceof InstallSnapshotReply) {
             handleInstallSnapshotReply((InstallSnapshotReply) message);
+        } else if (message instanceof CheckConsensusReached) {
+            possiblyUpdateCommitIndex();
         } else {
             return super.handleMessage(sender, message);
         }
@@ -553,8 +566,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void replicate(Replicate replicate) {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
-        log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(),
-                replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass());
+        log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}, isSendImmediate: {}", logName(),
+                replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass(),
+                replicate.isSendImmediate());
 
         // Create a tracker entry we will use this later to notify the
         // client actor
@@ -571,7 +585,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             applyLogToStateMachine(logIndex);
         }
 
-        if (!followerToLog.isEmpty()) {
+        if (replicate.isSendImmediate() && !followerToLog.isEmpty()) {
             sendAppendEntries(0, false);
         }
     }
@@ -592,8 +606,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     /**
      * This method checks if any update needs to be sent to the given follower. This includes append log entries,
      * sending next snapshot chunk, and initiating a snapshot.
-     *
-     * @return true if any update is sent, false otherwise
      */
     private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
                                        boolean sendHeartbeat, boolean isHeartbeat) {
@@ -699,7 +711,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     /**
      * Initiates a snapshot capture to install on a follower.
-     * <p/>
+     *
+     * <p>
      * Install Snapshot works as follows
      *   1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
      *   2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
@@ -725,16 +738,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
             sendSnapshotChunk(followerActor, followerLogInfo);
             return true;
-        } else {
-            boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
-                    this.getReplicatedToAllIndex(), followerId);
-            if (captureInitiated) {
-                followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
-                        context.getConfigParams().getSnapshotChunkSize(), logName()));
-            }
+        }
 
-            return captureInitiated;
+        boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+            this.getReplicatedToAllIndex(), followerId);
+        if (captureInitiated) {
+            followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
+                context.getConfigParams().getSnapshotChunkSize(), logName()));
         }
+
+        return captureInitiated;
     }
 
     private boolean canInstallSnapshot(long nextIndex) {