Improve Candidate.votingPeers
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupport.java
index f071d945662a73d22803fc9ea277a88d88ef29c1..389e8dfd8ff942a090a9201fa5432b944402d9d8 100644 (file)
@@ -11,16 +11,18 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotOffer;
 import com.google.common.base.Stopwatch;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.messages.PersistentPayload;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
 import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
 import org.slf4j.Logger;
 
 /**
@@ -38,12 +40,13 @@ class RaftActorRecoverySupport {
     private boolean hasMigratedDataRecovered;
 
     private Stopwatch recoveryTimer;
+    private Stopwatch recoverySnapshotTimer;
     private final Logger log;
 
     RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
         this.context = context;
         this.cohort = cohort;
-        this.log = context.getLogger();
+        log = context.getLogger();
     }
 
     boolean handleRecoveryMessage(final Object message, final PersistentDataProvider persistentProvider) {
@@ -56,19 +59,18 @@ class RaftActorRecoverySupport {
         }
 
         boolean recoveryComplete = false;
-        if (message instanceof UpdateElectionTerm) {
-            context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
-                    ((UpdateElectionTerm) message).getVotedFor());
-        } else if (message instanceof SnapshotOffer) {
-            onRecoveredSnapshot((SnapshotOffer) message);
-        } else if (message instanceof ReplicatedLogEntry) {
-            onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
-        } else if (message instanceof ApplyJournalEntries) {
-            onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
-        } else if (message instanceof DeleteEntries) {
-            onDeleteEntries((DeleteEntries) message);
-        } else if (message instanceof ServerConfigurationPayload) {
-            context.updatePeerIds((ServerConfigurationPayload)message);
+        if (message instanceof UpdateElectionTerm updateElectionTerm) {
+            context.getTermInformation().update(updateElectionTerm.getCurrentTerm(), updateElectionTerm.getVotedFor());
+        } else if (message instanceof SnapshotOffer snapshotOffer) {
+            onRecoveredSnapshot(snapshotOffer);
+        } else if (message instanceof ReplicatedLogEntry replicatedLogEntry) {
+            onRecoveredJournalLogEntry(replicatedLogEntry);
+        } else if (message instanceof ApplyJournalEntries applyJournalEntries) {
+            onRecoveredApplyLogEntries(applyJournalEntries.getToIndex());
+        } else if (message instanceof DeleteEntries deleteEntries) {
+            onDeleteEntries(deleteEntries);
+        } else if (message instanceof ServerConfigurationPayload serverConfigurationPayload) {
+            context.updatePeerIds(serverConfigurationPayload);
         } else if (message instanceof RecoveryCompleted) {
             recoveryComplete = true;
             onRecoveryCompletedMessage(persistentProvider);
@@ -99,16 +101,19 @@ class RaftActorRecoverySupport {
         return context.getReplicatedLog();
     }
 
-    private void initRecoveryTimer() {
+    private void initRecoveryTimers() {
         if (recoveryTimer == null) {
             recoveryTimer = Stopwatch.createStarted();
         }
+        if (recoverySnapshotTimer == null && context.getConfigParams().getRecoverySnapshotIntervalSeconds() > 0) {
+            recoverySnapshotTimer = Stopwatch.createStarted();
+        }
     }
 
     private void onRecoveredSnapshot(final SnapshotOffer offer) {
         log.debug("{}: SnapshotOffer called.", context.getId());
 
-        initRecoveryTimer();
+        initRecoveryTimers();
 
         Snapshot snapshot = (Snapshot) offer.snapshot();
 
@@ -136,11 +141,15 @@ class RaftActorRecoverySupport {
         context.setCommitIndex(snapshot.getLastAppliedIndex());
         context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
 
-        Stopwatch timer = Stopwatch.createStarted();
+        final Stopwatch timer = Stopwatch.createStarted();
 
         // Apply the snapshot to the actors state
-        if (!(snapshot.getState() instanceof EmptyState)) {
-            cohort.applyRecoverySnapshot(snapshot.getState());
+        final State snapshotState = snapshot.getState();
+        if (snapshotState.needsMigration()) {
+            hasMigratedDataRecovered = true;
+        }
+        if (!(snapshotState instanceof EmptyState)) {
+            cohort.applyRecoverySnapshot(snapshotState);
         }
 
         if (snapshot.getServerConfiguration() != null) {
@@ -149,8 +158,8 @@ class RaftActorRecoverySupport {
 
         timer.stop();
         log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
-                context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
-                replicatedLog().getSnapshotTerm(), replicatedLog().size());
+                context.getId(), timer, replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm(),
+                replicatedLog().size());
     }
 
     private void onRecoveredJournalLogEntry(final ReplicatedLogEntry logEntry) {
@@ -195,6 +204,14 @@ class RaftActorRecoverySupport {
             if (logEntry != null) {
                 lastApplied++;
                 batchRecoveredLogEntry(logEntry);
+                if (shouldTakeRecoverySnapshot() && !context.getSnapshotManager().isCapturing()) {
+                    if (currentRecoveryBatchCount > 0) {
+                        endCurrentLogRecoveryBatch();
+                    }
+                    context.setLastApplied(lastApplied);
+                    context.setCommitIndex(lastApplied);
+                    takeRecoverySnapshot(logEntry);
+                }
             } else {
                 // Shouldn't happen but cover it anyway.
                 log.error("{}: Log entry not found for index {}", context.getId(), i);
@@ -215,7 +232,7 @@ class RaftActorRecoverySupport {
     }
 
     private void batchRecoveredLogEntry(final ReplicatedLogEntry logEntry) {
-        initRecoveryTimer();
+        initRecoveryTimers();
 
         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
         if (!isServerConfigurationPayload(logEntry)) {
@@ -231,6 +248,23 @@ class RaftActorRecoverySupport {
         }
     }
 
+    private void takeRecoverySnapshot(final ReplicatedLogEntry logEntry) {
+        log.info("Time for recovery snapshot on entry with index {}", logEntry.getIndex());
+        final SnapshotManager snapshotManager = context.getSnapshotManager();
+        if (snapshotManager.capture(logEntry, -1)) {
+            log.info("Capturing snapshot, resetting timer for the next recovery snapshot interval.");
+            recoverySnapshotTimer.reset().start();
+        } else {
+            log.info("SnapshotManager is not able to capture snapshot at this time. It will be retried "
+                + "again with the next recovered entry.");
+        }
+    }
+
+    private boolean shouldTakeRecoverySnapshot() {
+        return recoverySnapshotTimer != null && recoverySnapshotTimer.elapsed(TimeUnit.SECONDS)
+            >= context.getConfigParams().getRecoverySnapshotIntervalSeconds();
+    }
+
     private void endCurrentLogRecoveryBatch() {
         cohort.applyCurrentLogRecoveryBatch();
         currentRecoveryBatchCount = 0;
@@ -241,17 +275,23 @@ class RaftActorRecoverySupport {
             endCurrentLogRecoveryBatch();
         }
 
-        String recoveryTime = "";
+        final String recoveryTime;
         if (recoveryTimer != null) {
-            recoveryTimer.stop();
-            recoveryTime = " in " + recoveryTimer.toString();
+            recoveryTime = " in " + recoveryTimer.stop();
             recoveryTimer = null;
+        } else {
+            recoveryTime = "";
         }
 
-        log.info("Recovery completed {} - Switching actor to Follower - Persistence Id = {}"
-                 + " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, journal-size = {}",
-                 recoveryTime, context.getId(), replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
-                 replicatedLog().getSnapshotTerm(), replicatedLog().size());
+        if (recoverySnapshotTimer != null) {
+            recoverySnapshotTimer.stop();
+            recoverySnapshotTimer = null;
+        }
+
+        log.info("{}: Recovery completed {} - Switching actor to Follower - last log index = {}, last log term = {}, "
+                + "snapshot index = {}, snapshot term = {}, journal size = {}", context.getId(), recoveryTime,
+                replicatedLog().lastIndex(), replicatedLog().lastTerm(), replicatedLog().getSnapshotIndex(),
+                replicatedLog().getSnapshotTerm(), replicatedLog().size());
 
         if (dataRecoveredWithPersistenceDisabled
                 || hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
@@ -297,6 +337,6 @@ class RaftActorRecoverySupport {
     }
 
     private static boolean isMigratedSerializable(final Object message) {
-        return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated();
+        return message instanceof MigratedSerializable migrated && migrated.isMigrated();
     }
 }