Allow incremental recovery
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupport.java
index 873b8514a2e30038f4327a01507df063b2468211..10375f9406666234bb3f6f69d14cb2ea003ef7f8 100644 (file)
@@ -11,6 +11,7 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotOffer;
 import com.google.common.base.Stopwatch;
 import java.util.Collections;
 import akka.persistence.SnapshotOffer;
 import com.google.common.base.Stopwatch;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
@@ -39,6 +40,7 @@ class RaftActorRecoverySupport {
     private boolean hasMigratedDataRecovered;
 
     private Stopwatch recoveryTimer;
     private boolean hasMigratedDataRecovered;
 
     private Stopwatch recoveryTimer;
+    private Stopwatch recoverySnapshotTimer;
     private final Logger log;
 
     RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
     private final Logger log;
 
     RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
@@ -100,16 +102,19 @@ class RaftActorRecoverySupport {
         return context.getReplicatedLog();
     }
 
         return context.getReplicatedLog();
     }
 
-    private void initRecoveryTimer() {
+    private void initRecoveryTimers() {
         if (recoveryTimer == null) {
             recoveryTimer = Stopwatch.createStarted();
         }
         if (recoveryTimer == null) {
             recoveryTimer = Stopwatch.createStarted();
         }
+        if (recoverySnapshotTimer == null && context.getConfigParams().getRecoverySnapshotIntervalSeconds() > 0) {
+            recoverySnapshotTimer = Stopwatch.createStarted();
+        }
     }
 
     private void onRecoveredSnapshot(final SnapshotOffer offer) {
         log.debug("{}: SnapshotOffer called.", context.getId());
 
     }
 
     private void onRecoveredSnapshot(final SnapshotOffer offer) {
         log.debug("{}: SnapshotOffer called.", context.getId());
 
-        initRecoveryTimer();
+        initRecoveryTimers();
 
         Snapshot snapshot = (Snapshot) offer.snapshot();
 
 
         Snapshot snapshot = (Snapshot) offer.snapshot();
 
@@ -200,6 +205,14 @@ class RaftActorRecoverySupport {
             if (logEntry != null) {
                 lastApplied++;
                 batchRecoveredLogEntry(logEntry);
             if (logEntry != null) {
                 lastApplied++;
                 batchRecoveredLogEntry(logEntry);
+                if (shouldTakeRecoverySnapshot() && !context.getSnapshotManager().isCapturing()) {
+                    if (currentRecoveryBatchCount > 0) {
+                        endCurrentLogRecoveryBatch();
+                    }
+                    context.setLastApplied(lastApplied);
+                    context.setCommitIndex(lastApplied);
+                    takeRecoverySnapshot(logEntry);
+                }
             } else {
                 // Shouldn't happen but cover it anyway.
                 log.error("{}: Log entry not found for index {}", context.getId(), i);
             } else {
                 // Shouldn't happen but cover it anyway.
                 log.error("{}: Log entry not found for index {}", context.getId(), i);
@@ -220,7 +233,7 @@ class RaftActorRecoverySupport {
     }
 
     private void batchRecoveredLogEntry(final ReplicatedLogEntry logEntry) {
     }
 
     private void batchRecoveredLogEntry(final ReplicatedLogEntry logEntry) {
-        initRecoveryTimer();
+        initRecoveryTimers();
 
         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
         if (!isServerConfigurationPayload(logEntry)) {
 
         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
         if (!isServerConfigurationPayload(logEntry)) {
@@ -236,6 +249,23 @@ class RaftActorRecoverySupport {
         }
     }
 
         }
     }
 
+    private void takeRecoverySnapshot(final ReplicatedLogEntry logEntry) {
+        log.info("Time for recovery snapshot on entry with index {}", logEntry.getIndex());
+        final SnapshotManager snapshotManager = context.getSnapshotManager();
+        if (snapshotManager.capture(logEntry, -1)) {
+            log.info("Capturing snapshot, resetting timer for the next recovery snapshot interval.");
+            this.recoverySnapshotTimer.reset().start();
+        } else {
+            log.info("SnapshotManager is not able to capture snapshot at this time. It will be retried "
+                + "again with the next recovered entry.");
+        }
+    }
+
+    private boolean shouldTakeRecoverySnapshot() {
+        return this.recoverySnapshotTimer != null && this.recoverySnapshotTimer.elapsed(TimeUnit.SECONDS)
+            >= context.getConfigParams().getRecoverySnapshotIntervalSeconds();
+    }
+
     private void endCurrentLogRecoveryBatch() {
         cohort.applyCurrentLogRecoveryBatch();
         currentRecoveryBatchCount = 0;
     private void endCurrentLogRecoveryBatch() {
         cohort.applyCurrentLogRecoveryBatch();
         currentRecoveryBatchCount = 0;
@@ -254,6 +284,11 @@ class RaftActorRecoverySupport {
             recoveryTime = "";
         }
 
             recoveryTime = "";
         }
 
+        if (recoverySnapshotTimer != null) {
+            recoverySnapshotTimer.stop();
+            recoverySnapshotTimer = null;
+        }
+
         log.info("{}: Recovery completed {} - Switching actor to Follower - last log index = {}, last log term = {}, "
                 + "snapshot index = {}, snapshot term = {}, journal size = {}", context.getId(), recoveryTime,
                 replicatedLog().lastIndex(), replicatedLog().lastTerm(), replicatedLog().getSnapshotIndex(),
         log.info("{}: Recovery completed {} - Switching actor to Follower - last log index = {}, last log term = {}, "
                 + "snapshot index = {}, snapshot term = {}, journal size = {}", context.getId(), recoveryTime,
                 replicatedLog().lastIndex(), replicatedLog().lastTerm(), replicatedLog().getSnapshotIndex(),