Bug 7362: Notify applyState synchronously
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Follower.java
index 7e8a7725910d599f7cd56ef40daaa06de2f8bece..6f107e9ae61ee060071c842810cb82c3e7f92f85 100644 (file)
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -55,9 +56,6 @@ public class Follower extends AbstractRaftActorBehavior {
 
     private final SyncStatusTracker initialSyncStatusTracker;
 
-    private final Procedure<ReplicatedLogEntry> appendAndPersistCallback =
-        logEntry -> context.getReplicatedLog().captureSnapshotIfReady(logEntry);
-
     private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
     private SnapshotTracker snapshotTracker = null;
     private String leaderId;
@@ -236,6 +234,22 @@ public class Follower extends AbstractRaftActorBehavior {
             log.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(),
                     lastIndex, addEntriesFrom);
 
+            // When persistence successfully completes for each new log entry appended, we need to determine if we
+            // should capture a snapshot to compact the persisted log. shouldCaptureSnapshot tracks whether or not
+            // one of the log entries has exceeded the log size threshold whereby a snapshot should be taken. However
+            // we don't initiate the snapshot at that log entry but rather after the last log entry has been persisted.
+            // This is done because subsequent log entries after the one that tripped the threshold may have been
+            // applied to the state already, as the persistence callback occurs async, and we want those entries
+            // purged from the persisted log as well.
+            final AtomicBoolean shouldCaptureSnapshot = new AtomicBoolean(false);
+            final Procedure<ReplicatedLogEntry> appendAndPersistCallback = logEntry -> {
+                final ReplicatedLogEntry lastEntryToAppend = appendEntries.getEntries().get(
+                        appendEntries.getEntries().size() - 1);
+                if (shouldCaptureSnapshot.get() && logEntry == lastEntryToAppend) {
+                    context.getSnapshotManager().capture(context.getReplicatedLog().last(), getReplicatedToAllIndex());
+                }
+            };
+
             // 4. Append any new entries not already in the log
             for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) {
                 ReplicatedLogEntry entry = appendEntries.getEntries().get(i);
@@ -244,6 +258,9 @@ public class Follower extends AbstractRaftActorBehavior {
 
                 context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback, false);
 
+                shouldCaptureSnapshot.compareAndSet(false,
+                        context.getReplicatedLog().shouldCaptureSnapshot(entry.getIndex()));
+
                 if (entry.getData() instanceof ServerConfigurationPayload) {
                     context.updatePeerIds((ServerConfigurationPayload)entry.getData());
                 }