Bump persisted PayloadVersion
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupport.java
index d0217a6bc0d7223c0976609189a939050bae5e49..283aa8f142087c2421f58d7a116351652c58575f 100644 (file)
@@ -10,24 +10,25 @@ package org.opendaylight.controller.cluster.raft;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotOffer;
 import com.google.common.base.Stopwatch;
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
 import java.util.Collections;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
 import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
 import org.slf4j.Logger;
+
 /**
  * Support class that handles persistence recovery for a RaftActor.
  *
  * @author Thomas Pantelis
  */
-
 class RaftActorRecoverySupport {
     private final RaftActorContext context;
     private final RaftActorRecoveryCohort cohort;
@@ -46,7 +47,7 @@ class RaftActorRecoverySupport {
         this.log = context.getLogger();
     }
 
-    boolean handleRecoveryMessage(Object message, PersistentDataProvider persistentProvider) {
+    boolean handleRecoveryMessage(final Object message, final PersistentDataProvider persistentProvider) {
         log.trace("{}: handleRecoveryMessage: {}", context.getId(), message);
 
         anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
@@ -77,8 +78,9 @@ class RaftActorRecoverySupport {
         return recoveryComplete;
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private void possiblyRestoreFromSnapshot() {
-        byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot();
+        Snapshot restoreFromSnapshot = cohort.getRestoreFromSnapshot();
         if (restoreFromSnapshot == null) {
             return;
         }
@@ -89,15 +91,9 @@ class RaftActorRecoverySupport {
             return;
         }
 
-        try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) {
-            Snapshot snapshot = (Snapshot) ois.readObject();
-
-            log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot);
+        log.debug("{}: Restore snapshot: {}", context.getId(), restoreFromSnapshot);
 
-            context.getSnapshotManager().apply(new ApplySnapshot(snapshot));
-        } catch (Exception e) {
-            log.error("{}: Error deserializing snapshot restore", context.getId(), e);
-        }
+        context.getSnapshotManager().apply(new ApplySnapshot(restoreFromSnapshot));
     }
 
     private ReplicatedLog replicatedLog() {
@@ -110,8 +106,8 @@ class RaftActorRecoverySupport {
         }
     }
 
-    private void onRecoveredSnapshot(SnapshotOffer offer) {
-        log.debug("{}: SnapshotOffer called..", context.getId());
+    private void onRecoveredSnapshot(final SnapshotOffer offer) {
+        log.debug("{}: SnapshotOffer called.", context.getId());
 
         initRecoveryTimer();
 
@@ -127,7 +123,8 @@ class RaftActorRecoverySupport {
             // We may have just transitioned to disabled and have a snapshot containing state data and/or log
             // entries - we don't want to preserve these, only the server config and election term info.
 
-            snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1,
+            snapshot = Snapshot.create(
+                    EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
                     snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration());
         }
 
@@ -140,26 +137,28 @@ class RaftActorRecoverySupport {
         context.setCommitIndex(snapshot.getLastAppliedIndex());
         context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
 
-        Stopwatch timer = Stopwatch.createStarted();
+        final Stopwatch timer = Stopwatch.createStarted();
 
         // Apply the snapshot to the actors state
-        cohort.applyRecoverySnapshot(snapshot.getState());
+        final State snapshotState = snapshot.getState();
+        if (snapshotState.needsMigration()) {
+            hasMigratedDataRecovered = true;
+        }
+        if (!(snapshotState instanceof EmptyState)) {
+            cohort.applyRecoverySnapshot(snapshotState);
+        }
 
         if (snapshot.getServerConfiguration() != null) {
             context.updatePeerIds(snapshot.getServerConfiguration());
-
-            if (isMigratedSerializable(snapshot.getServerConfiguration())) {
-                hasMigratedDataRecovered = true;
-            }
         }
 
         timer.stop();
         log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
-                context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
-                replicatedLog().getSnapshotTerm(), replicatedLog().size());
+                context.getId(), timer, replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm(),
+                replicatedLog().size());
     }
 
-    private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+    private void onRecoveredJournalLogEntry(final ReplicatedLogEntry logEntry) {
         if (log.isDebugEnabled()) {
             log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(),
                     logEntry.getIndex(), logEntry.size());
@@ -180,7 +179,7 @@ class RaftActorRecoverySupport {
         }
     }
 
-    private void onRecoveredApplyLogEntries(long toIndex) {
+    private void onRecoveredApplyLogEntries(final long toIndex) {
         if (!context.getPersistenceProvider().isRecoveryApplicable()) {
             dataRecoveredWithPersistenceDisabled = true;
             return;
@@ -212,7 +211,7 @@ class RaftActorRecoverySupport {
         context.setCommitIndex(lastApplied);
     }
 
-    private void onDeleteEntries(DeleteEntries deleteEntries) {
+    private void onDeleteEntries(final DeleteEntries deleteEntries) {
         if (context.getPersistenceProvider().isRecoveryApplicable()) {
             replicatedLog().removeFrom(deleteEntries.getFromIndex());
         } else {
@@ -220,7 +219,7 @@ class RaftActorRecoverySupport {
         }
     }
 
-    private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
+    private void batchRecoveredLogEntry(final ReplicatedLogEntry logEntry) {
         initRecoveryTimer();
 
         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
@@ -242,7 +241,7 @@ class RaftActorRecoverySupport {
         currentRecoveryBatchCount = 0;
     }
 
-    private void onRecoveryCompletedMessage(PersistentDataProvider persistentProvider) {
+    private void onRecoveryCompletedMessage(final PersistentDataProvider persistentProvider) {
         if (currentRecoveryBatchCount > 0) {
             endCurrentLogRecoveryBatch();
         }
@@ -254,10 +253,10 @@ class RaftActorRecoverySupport {
             recoveryTimer = null;
         }
 
-        log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " + "Persistence Id =  "
-                  + context.getId() + " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, "
-                  + "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
-                 replicatedLog().getSnapshotTerm(), replicatedLog().size());
+        log.info("{}: Recovery completed {} - Switching actor to Follower - last log index = {}, last log term = {}, "
+                + "snapshot index = {}, snapshot term = {}, journal size = {}", context.getId(), recoveryTime,
+                replicatedLog().lastIndex(), replicatedLog().lastTerm(), replicatedLog().getSnapshotIndex(),
+                replicatedLog().getSnapshotTerm(), replicatedLog().size());
 
         if (dataRecoveredWithPersistenceDisabled
                 || hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
@@ -272,7 +271,8 @@ class RaftActorRecoverySupport {
             // messages. Either way, we persist a snapshot and delete all the messages from the akka journal
             // to clean out unwanted messages.
 
-            Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+            Snapshot snapshot = Snapshot.create(
+                    EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
                     -1, -1, -1, -1,
                     context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
                     context.getPeerServerInfo(true));
@@ -289,19 +289,19 @@ class RaftActorRecoverySupport {
         }
     }
 
-    private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry) {
+    private static boolean isServerConfigurationPayload(final ReplicatedLogEntry repLogEntry) {
         return repLogEntry.getData() instanceof ServerConfigurationPayload;
     }
 
-    private static boolean isPersistentPayload(ReplicatedLogEntry repLogEntry) {
+    private static boolean isPersistentPayload(final ReplicatedLogEntry repLogEntry) {
         return repLogEntry.getData() instanceof PersistentPayload;
     }
 
-    private static boolean isMigratedPayload(ReplicatedLogEntry repLogEntry) {
+    private static boolean isMigratedPayload(final ReplicatedLogEntry repLogEntry) {
         return isMigratedSerializable(repLogEntry.getData());
     }
 
-    private static boolean isMigratedSerializable(Object message) {
+    private static boolean isMigratedSerializable(final Object message) {
         return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated();
     }
 }