Use an instanceof pattern
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupport.java
index 283aa8f142087c2421f58d7a116351652c58575f..2eb343f6919b89bd03ebd4fea5496736780060a2 100644 (file)
@@ -11,8 +11,10 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotOffer;
 import com.google.common.base.Stopwatch;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.messages.PersistentPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
@@ -21,7 +23,6 @@ import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPay
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
 import org.slf4j.Logger;
 
 /**
@@ -39,12 +40,13 @@ class RaftActorRecoverySupport {
     private boolean hasMigratedDataRecovered;
 
     private Stopwatch recoveryTimer;
+    private Stopwatch recoverySnapshotTimer;
     private final Logger log;
 
     RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
         this.context = context;
         this.cohort = cohort;
-        this.log = context.getLogger();
+        log = context.getLogger();
     }
 
     boolean handleRecoveryMessage(final Object message, final PersistentDataProvider persistentProvider) {
@@ -100,16 +102,19 @@ class RaftActorRecoverySupport {
         return context.getReplicatedLog();
     }
 
-    private void initRecoveryTimer() {
+    private void initRecoveryTimers() {
         if (recoveryTimer == null) {
             recoveryTimer = Stopwatch.createStarted();
         }
+        if (recoverySnapshotTimer == null && context.getConfigParams().getRecoverySnapshotIntervalSeconds() > 0) {
+            recoverySnapshotTimer = Stopwatch.createStarted();
+        }
     }
 
     private void onRecoveredSnapshot(final SnapshotOffer offer) {
         log.debug("{}: SnapshotOffer called.", context.getId());
 
-        initRecoveryTimer();
+        initRecoveryTimers();
 
         Snapshot snapshot = (Snapshot) offer.snapshot();
 
@@ -200,6 +205,14 @@ class RaftActorRecoverySupport {
             if (logEntry != null) {
                 lastApplied++;
                 batchRecoveredLogEntry(logEntry);
+                if (shouldTakeRecoverySnapshot() && !context.getSnapshotManager().isCapturing()) {
+                    if (currentRecoveryBatchCount > 0) {
+                        endCurrentLogRecoveryBatch();
+                    }
+                    context.setLastApplied(lastApplied);
+                    context.setCommitIndex(lastApplied);
+                    takeRecoverySnapshot(logEntry);
+                }
             } else {
                 // Shouldn't happen but cover it anyway.
                 log.error("{}: Log entry not found for index {}", context.getId(), i);
@@ -220,7 +233,7 @@ class RaftActorRecoverySupport {
     }
 
     private void batchRecoveredLogEntry(final ReplicatedLogEntry logEntry) {
-        initRecoveryTimer();
+        initRecoveryTimers();
 
         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
         if (!isServerConfigurationPayload(logEntry)) {
@@ -236,6 +249,23 @@ class RaftActorRecoverySupport {
         }
     }
 
+    private void takeRecoverySnapshot(final ReplicatedLogEntry logEntry) {
+        log.info("Time for recovery snapshot on entry with index {}", logEntry.getIndex());
+        final SnapshotManager snapshotManager = context.getSnapshotManager();
+        if (snapshotManager.capture(logEntry, -1)) {
+            log.info("Capturing snapshot, resetting timer for the next recovery snapshot interval.");
+            recoverySnapshotTimer.reset().start();
+        } else {
+            log.info("SnapshotManager is not able to capture snapshot at this time. It will be retried "
+                + "again with the next recovered entry.");
+        }
+    }
+
+    private boolean shouldTakeRecoverySnapshot() {
+        return recoverySnapshotTimer != null && recoverySnapshotTimer.elapsed(TimeUnit.SECONDS)
+            >= context.getConfigParams().getRecoverySnapshotIntervalSeconds();
+    }
+
     private void endCurrentLogRecoveryBatch() {
         cohort.applyCurrentLogRecoveryBatch();
         currentRecoveryBatchCount = 0;
@@ -246,11 +276,17 @@ class RaftActorRecoverySupport {
             endCurrentLogRecoveryBatch();
         }
 
-        String recoveryTime = "";
+        final String recoveryTime;
         if (recoveryTimer != null) {
-            recoveryTimer.stop();
-            recoveryTime = " in " + recoveryTimer.toString();
+            recoveryTime = " in " + recoveryTimer.stop();
             recoveryTimer = null;
+        } else {
+            recoveryTime = "";
+        }
+
+        if (recoverySnapshotTimer != null) {
+            recoverySnapshotTimer.stop();
+            recoverySnapshotTimer = null;
         }
 
         log.info("{}: Recovery completed {} - Switching actor to Follower - last log index = {}, last log term = {}, "
@@ -302,6 +338,6 @@ class RaftActorRecoverySupport {
     }
 
     private static boolean isMigratedSerializable(final Object message) {
-        return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated();
+        return message instanceof MigratedSerializable migrated && migrated.isMigrated();
     }
 }