Bug 5419: Persist log entries asycnhronously
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index 31bf99c2dc2bc9f5564c256294c605f6af1bd482..d97905cf11e4f7174bc85505de27b1687da347d2 100644 (file)
@@ -35,6 +35,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -291,17 +292,47 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
         }
 
             }
         }
 
-        // Now figure out if this reply warrants a change in the commitIndex
-        // If there exists an N such that N > commitIndex, a majority
-        // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
-        // set commitIndex = N (§5.3, §5.4).
         if (log.isTraceEnabled()) {
             log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
                     logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
         }
 
         if (log.isTraceEnabled()) {
             log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
                     logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
         }
 
+        possiblyUpdateCommitIndex();
+
+        //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
+        sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
+
+        return this;
+    }
+
+    private void possiblyUpdateCommitIndex() {
+        // Figure out if we can update the the commitIndex as follows:
+        //   If there exists an index N such that N > commitIndex, a majority of matchIndex[i] ≥ N,
+        //     and log[N].term == currentTerm:
+        //   set commitIndex = N (§5.3, §5.4).
         for (long index = context.getCommitIndex() + 1; ; index++) {
         for (long index = context.getCommitIndex() + 1; ; index++) {
-            int replicatedCount = 1;
+            ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
+            if (replicatedLogEntry == null) {
+                log.trace("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
+                        logName(), index, context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().size());
+                break;
+            }
+
+            // Count our entry if it has been persisted.
+            int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1;
+
+            if (replicatedCount == 0) {
+                // We don't commit and apply a log entry until we've gotten the ack from our local persistence. Ideally
+                // we should be able to update the commit index if we get a consensus amongst the followers
+                // w/o the local persistence ack however this can cause timing issues with snapshot capture
+                // which may lead to an entry that is neither in the serialized snapshot state nor in the snapshot's
+                // unapplied entries. This can happen if the lastAppliedIndex is updated but the corresponding
+                // ApplyState message is still pending in the message queue and thus the corresponding log entry hasn't
+                // actually been applied to the state yet. This would be alleviated by eliminating the ApplyState
+                // message in lieu of synchronously updating lastAppliedIndex and applying to state.
+                break;
+            }
 
             log.trace("{}: checking Nth index {}", logName(), index);
             for (FollowerLogInformation info : followerToLog.values()) {
 
             log.trace("{}: checking Nth index {}", logName(), index);
             for (FollowerLogInformation info : followerToLog.values()) {
@@ -320,14 +351,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
 
             if (replicatedCount >= minReplicationCount) {
             }
 
             if (replicatedCount >= minReplicationCount) {
-                ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
-                if (replicatedLogEntry == null) {
-                    log.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
-                            logName(), index, context.getReplicatedLog().getSnapshotIndex(),
-                            context.getReplicatedLog().size());
-                    break;
-                }
-
                 // Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
                 // "Raft never commits log entries from previous terms by counting replicas".
                 // However we keep looping so we can make progress when new entries in the current term
                 // Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
                 // "Raft never commits log entries from previous terms by counting replicas".
                 // However we keep looping so we can make progress when new entries in the current term
@@ -349,11 +372,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         // Apply the change to the state machine
         if (context.getCommitIndex() > context.getLastApplied()) {
 
         // Apply the change to the state machine
         if (context.getCommitIndex() > context.getLastApplied()) {
-            if (log.isDebugEnabled()) {
-                log.debug(
-                    "{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}",
-                    logName(), followerId, context.getCommitIndex(), context.getLastApplied());
-            }
+            log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName(),
+                    context.getCommitIndex(), context.getLastApplied());
 
             applyLogToStateMachine(context.getCommitIndex());
         }
 
             applyLogToStateMachine(context.getCommitIndex());
         }
@@ -361,11 +381,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         if (!context.getSnapshotManager().isCapturing()) {
             purgeInMemoryLog();
         }
         if (!context.getSnapshotManager().isCapturing()) {
             purgeInMemoryLog();
         }
-
-        //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
-        sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
-
-        return this;
     }
 
     private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation,
     }
 
     private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation,
@@ -448,6 +463,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             replicate((Replicate) message);
         } else if (message instanceof InstallSnapshotReply) {
             handleInstallSnapshotReply((InstallSnapshotReply) message);
             replicate((Replicate) message);
         } else if (message instanceof InstallSnapshotReply) {
             handleInstallSnapshotReply((InstallSnapshotReply) message);
+        } else if (message instanceof CheckConsensusReached) {
+            possiblyUpdateCommitIndex();
         } else {
             return super.handleMessage(sender, message);
         }
         } else {
             return super.handleMessage(sender, message);
         }