import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
private final SyncStatusTracker initialSyncStatusTracker;
- private final Procedure<ReplicatedLogEntry> appendAndPersistCallback =
- logEntry -> context.getReplicatedLog().captureSnapshotIfReady(logEntry);
-
private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
private SnapshotTracker snapshotTracker = null;
private String leaderId;
log.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(),
lastIndex, addEntriesFrom);
+ // When persistence successfully completes for each new log entry appended, we need to determine if we
+ // should capture a snapshot to compact the persisted log. shouldCaptureSnapshot tracks whether or not
+ // one of the log entries has exceeded the log size threshold whereby a snapshot should be taken. However
+ // we don't initiate the snapshot at that log entry but rather after the last log entry has been persisted.
+ // This is done because subsequent log entries after the one that tripped the threshold may have been
+ // applied to the state already, as the persistence callback occurs async, and we want those entries
+ // purged from the persisted log as well.
+ final AtomicBoolean shouldCaptureSnapshot = new AtomicBoolean(false);
+ final Procedure<ReplicatedLogEntry> appendAndPersistCallback = logEntry -> {
+ final ReplicatedLogEntry lastEntryToAppend = appendEntries.getEntries().get(
+ appendEntries.getEntries().size() - 1);
+ if (shouldCaptureSnapshot.get() && logEntry == lastEntryToAppend) {
+ context.getSnapshotManager().capture(context.getReplicatedLog().last(), getReplicatedToAllIndex());
+ }
+ };
+
// 4. Append any new entries not already in the log
for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) {
ReplicatedLogEntry entry = appendEntries.getEntries().get(i);
context.getReplicatedLog().appendAndPersist(entry, appendAndPersistCallback, false);
+ shouldCaptureSnapshot.compareAndSet(false,
+ context.getReplicatedLog().shouldCaptureSnapshot(entry.getIndex()));
+
if (entry.getData() instanceof ServerConfigurationPayload) {
context.updatePeerIds((ServerConfigurationPayload)entry.getData());
}