Bug 5419: Persist log entries asycnhronously
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index e3fa649ab15294ae16d87ba9b368375020f27e91..46551506e337182df1c0f7a05e2a6fe1e11043ab 100644 (file)
@@ -34,6 +34,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
 import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
@@ -262,7 +263,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
             LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
 
-            persistence().persist(applyEntries, NoopProcedure.instance());
+            persistence().persistAsync(applyEntries, NoopProcedure.instance());
 
         } else if (message instanceof FindLeader) {
             getSender().tell(
@@ -520,32 +521,42 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
             context.getReplicatedLog().lastIndex() + 1,
             context.getTermInformation().getCurrentTerm(), data);
+        replicatedLogEntry.setPersistencePending(true);
 
         LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
 
         final RaftActorContext raftContext = getRaftActorContext();
 
-        replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> {
+        boolean wasAppended = replicatedLog().appendAndPersist(replicatedLogEntry, persistedLogEntry -> {
+            // Clear the persistence pending flag in the log entry.
+            persistedLogEntry.setPersistencePending(false);
+
             if (!hasFollowers()) {
                 // Increment the Commit Index and the Last Applied values
-                raftContext.setCommitIndex(replicatedLogEntry1.getIndex());
-                raftContext.setLastApplied(replicatedLogEntry1.getIndex());
+                raftContext.setCommitIndex(persistedLogEntry.getIndex());
+                raftContext.setLastApplied(persistedLogEntry.getIndex());
 
                 // Apply the state immediately.
-                self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self());
+                self().tell(new ApplyState(clientActor, identifier, persistedLogEntry), self());
 
                 // Send a ApplyJournalEntries message so that we write the fact that we applied
                 // the state to durable storage
-                self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self());
+                self().tell(new ApplyJournalEntries(persistedLogEntry.getIndex()), self());
 
             } else {
-                context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1);
+                context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
 
-                // Send message for replication
-                getCurrentBehavior().handleMessage(getSelf(),
-                        new Replicate(clientActor, identifier, replicatedLogEntry1));
+                // Local persistence is complete so send the CheckConsensusReached message to the behavior (which
+                // normally should still be the leader) to check if consensus has now been reached in conjunction with
+                // follower replication.
+                getCurrentBehavior().handleMessage(getSelf(), CheckConsensusReached.INSTANCE);
             }
-        });
+        }, true);
+
+        if (wasAppended && hasFollowers()) {
+            // Send log entry for replication.
+            getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry));
+        }
     }
 
     private ReplicatedLog replicatedLog() {