Allow incremental recovery
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupport.java
index 5e4e6571a10d3d1f998796849a9be7486d86de75..10375f9406666234bb3f6f69d14cb2ea003ef7f8 100644 (file)
@@ -10,16 +10,17 @@ package org.opendaylight.controller.cluster.raft;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotOffer;
 import com.google.common.base.Stopwatch;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
 import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
 import org.slf4j.Logger;
@@ -39,6 +40,7 @@ class RaftActorRecoverySupport {
     private boolean hasMigratedDataRecovered;
 
     private Stopwatch recoveryTimer;
+    private Stopwatch recoverySnapshotTimer;
     private final Logger log;
 
     RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
@@ -47,7 +49,7 @@ class RaftActorRecoverySupport {
         this.log = context.getLogger();
     }
 
-    boolean handleRecoveryMessage(Object message, PersistentDataProvider persistentProvider) {
+    boolean handleRecoveryMessage(final Object message, final PersistentDataProvider persistentProvider) {
         log.trace("{}: handleRecoveryMessage: {}", context.getId(), message);
 
         anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
@@ -80,7 +82,7 @@ class RaftActorRecoverySupport {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void possiblyRestoreFromSnapshot() {
-        byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot();
+        Snapshot restoreFromSnapshot = cohort.getRestoreFromSnapshot();
         if (restoreFromSnapshot == null) {
             return;
         }
@@ -91,31 +93,28 @@ class RaftActorRecoverySupport {
             return;
         }
 
-        try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) {
-            Snapshot snapshot = (Snapshot) ois.readObject();
+        log.debug("{}: Restore snapshot: {}", context.getId(), restoreFromSnapshot);
 
-            log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot);
-
-            context.getSnapshotManager().apply(new ApplySnapshot(snapshot));
-        } catch (RuntimeException | ClassNotFoundException | IOException e) {
-            log.error("{}: Error deserializing snapshot restore", context.getId(), e);
-        }
+        context.getSnapshotManager().apply(new ApplySnapshot(restoreFromSnapshot));
     }
 
     private ReplicatedLog replicatedLog() {
         return context.getReplicatedLog();
     }
 
-    private void initRecoveryTimer() {
+    private void initRecoveryTimers() {
         if (recoveryTimer == null) {
             recoveryTimer = Stopwatch.createStarted();
         }
+        if (recoverySnapshotTimer == null && context.getConfigParams().getRecoverySnapshotIntervalSeconds() > 0) {
+            recoverySnapshotTimer = Stopwatch.createStarted();
+        }
     }
 
-    private void onRecoveredSnapshot(SnapshotOffer offer) {
-        log.debug("{}: SnapshotOffer called..", context.getId());
+    private void onRecoveredSnapshot(final SnapshotOffer offer) {
+        log.debug("{}: SnapshotOffer called.", context.getId());
 
-        initRecoveryTimer();
+        initRecoveryTimers();
 
         Snapshot snapshot = (Snapshot) offer.snapshot();
 
@@ -129,7 +128,8 @@ class RaftActorRecoverySupport {
             // We may have just transitioned to disabled and have a snapshot containing state data and/or log
             // entries - we don't want to preserve these, only the server config and election term info.
 
-            snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1,
+            snapshot = Snapshot.create(
+                    EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
                     snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration());
         }
 
@@ -142,26 +142,28 @@ class RaftActorRecoverySupport {
         context.setCommitIndex(snapshot.getLastAppliedIndex());
         context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
 
-        Stopwatch timer = Stopwatch.createStarted();
+        final Stopwatch timer = Stopwatch.createStarted();
 
         // Apply the snapshot to the actors state
-        cohort.applyRecoverySnapshot(snapshot.getState());
+        final State snapshotState = snapshot.getState();
+        if (snapshotState.needsMigration()) {
+            hasMigratedDataRecovered = true;
+        }
+        if (!(snapshotState instanceof EmptyState)) {
+            cohort.applyRecoverySnapshot(snapshotState);
+        }
 
         if (snapshot.getServerConfiguration() != null) {
             context.updatePeerIds(snapshot.getServerConfiguration());
-
-            if (isMigratedSerializable(snapshot.getServerConfiguration())) {
-                hasMigratedDataRecovered = true;
-            }
         }
 
         timer.stop();
         log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
-                context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
-                replicatedLog().getSnapshotTerm(), replicatedLog().size());
+                context.getId(), timer, replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm(),
+                replicatedLog().size());
     }
 
-    private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+    private void onRecoveredJournalLogEntry(final ReplicatedLogEntry logEntry) {
         if (log.isDebugEnabled()) {
             log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(),
                     logEntry.getIndex(), logEntry.size());
@@ -182,7 +184,7 @@ class RaftActorRecoverySupport {
         }
     }
 
-    private void onRecoveredApplyLogEntries(long toIndex) {
+    private void onRecoveredApplyLogEntries(final long toIndex) {
         if (!context.getPersistenceProvider().isRecoveryApplicable()) {
             dataRecoveredWithPersistenceDisabled = true;
             return;
@@ -203,6 +205,14 @@ class RaftActorRecoverySupport {
             if (logEntry != null) {
                 lastApplied++;
                 batchRecoveredLogEntry(logEntry);
+                if (shouldTakeRecoverySnapshot() && !context.getSnapshotManager().isCapturing()) {
+                    if (currentRecoveryBatchCount > 0) {
+                        endCurrentLogRecoveryBatch();
+                    }
+                    context.setLastApplied(lastApplied);
+                    context.setCommitIndex(lastApplied);
+                    takeRecoverySnapshot(logEntry);
+                }
             } else {
                 // Shouldn't happen but cover it anyway.
                 log.error("{}: Log entry not found for index {}", context.getId(), i);
@@ -214,7 +224,7 @@ class RaftActorRecoverySupport {
         context.setCommitIndex(lastApplied);
     }
 
-    private void onDeleteEntries(DeleteEntries deleteEntries) {
+    private void onDeleteEntries(final DeleteEntries deleteEntries) {
         if (context.getPersistenceProvider().isRecoveryApplicable()) {
             replicatedLog().removeFrom(deleteEntries.getFromIndex());
         } else {
@@ -222,8 +232,8 @@ class RaftActorRecoverySupport {
         }
     }
 
-    private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
-        initRecoveryTimer();
+    private void batchRecoveredLogEntry(final ReplicatedLogEntry logEntry) {
+        initRecoveryTimers();
 
         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
         if (!isServerConfigurationPayload(logEntry)) {
@@ -239,27 +249,50 @@ class RaftActorRecoverySupport {
         }
     }
 
+    private void takeRecoverySnapshot(final ReplicatedLogEntry logEntry) {
+        log.info("Time for recovery snapshot on entry with index {}", logEntry.getIndex());
+        final SnapshotManager snapshotManager = context.getSnapshotManager();
+        if (snapshotManager.capture(logEntry, -1)) {
+            log.info("Capturing snapshot, resetting timer for the next recovery snapshot interval.");
+            this.recoverySnapshotTimer.reset().start();
+        } else {
+            log.info("SnapshotManager is not able to capture snapshot at this time. It will be retried "
+                + "again with the next recovered entry.");
+        }
+    }
+
+    private boolean shouldTakeRecoverySnapshot() {
+        return this.recoverySnapshotTimer != null && this.recoverySnapshotTimer.elapsed(TimeUnit.SECONDS)
+            >= context.getConfigParams().getRecoverySnapshotIntervalSeconds();
+    }
+
     private void endCurrentLogRecoveryBatch() {
         cohort.applyCurrentLogRecoveryBatch();
         currentRecoveryBatchCount = 0;
     }
 
-    private void onRecoveryCompletedMessage(PersistentDataProvider persistentProvider) {
+    private void onRecoveryCompletedMessage(final PersistentDataProvider persistentProvider) {
         if (currentRecoveryBatchCount > 0) {
             endCurrentLogRecoveryBatch();
         }
 
-        String recoveryTime = "";
+        final String recoveryTime;
         if (recoveryTimer != null) {
-            recoveryTimer.stop();
-            recoveryTime = " in " + recoveryTimer.toString();
+            recoveryTime = " in " + recoveryTimer.stop();
             recoveryTimer = null;
+        } else {
+            recoveryTime = "";
         }
 
-        log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " + "Persistence Id =  "
-                  + context.getId() + " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, "
-                  + "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
-                 replicatedLog().getSnapshotTerm(), replicatedLog().size());
+        if (recoverySnapshotTimer != null) {
+            recoverySnapshotTimer.stop();
+            recoverySnapshotTimer = null;
+        }
+
+        log.info("{}: Recovery completed {} - Switching actor to Follower - last log index = {}, last log term = {}, "
+                + "snapshot index = {}, snapshot term = {}, journal size = {}", context.getId(), recoveryTime,
+                replicatedLog().lastIndex(), replicatedLog().lastTerm(), replicatedLog().getSnapshotIndex(),
+                replicatedLog().getSnapshotTerm(), replicatedLog().size());
 
         if (dataRecoveredWithPersistenceDisabled
                 || hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
@@ -274,7 +307,8 @@ class RaftActorRecoverySupport {
             // messages. Either way, we persist a snapshot and delete all the messages from the akka journal
             // to clean out unwanted messages.
 
-            Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+            Snapshot snapshot = Snapshot.create(
+                    EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
                     -1, -1, -1, -1,
                     context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
                     context.getPeerServerInfo(true));
@@ -291,19 +325,19 @@ class RaftActorRecoverySupport {
         }
     }
 
-    private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry) {
+    private static boolean isServerConfigurationPayload(final ReplicatedLogEntry repLogEntry) {
         return repLogEntry.getData() instanceof ServerConfigurationPayload;
     }
 
-    private static boolean isPersistentPayload(ReplicatedLogEntry repLogEntry) {
+    private static boolean isPersistentPayload(final ReplicatedLogEntry repLogEntry) {
         return repLogEntry.getData() instanceof PersistentPayload;
     }
 
-    private static boolean isMigratedPayload(ReplicatedLogEntry repLogEntry) {
+    private static boolean isMigratedPayload(final ReplicatedLogEntry repLogEntry) {
         return isMigratedSerializable(repLogEntry.getData());
     }
 
-    private static boolean isMigratedSerializable(Object message) {
+    private static boolean isMigratedSerializable(final Object message) {
         return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated();
     }
 }