Bump persisted PayloadVersion
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupport.java
index 1b9343c1f3ff569b378c379f3b0ef1c91c7773d3..283aa8f142087c2421f58d7a116351652c58575f 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
 import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
 import org.slf4j.Logger;
@@ -46,7 +47,7 @@ class RaftActorRecoverySupport {
         this.log = context.getLogger();
     }
 
-    boolean handleRecoveryMessage(Object message, PersistentDataProvider persistentProvider) {
+    boolean handleRecoveryMessage(final Object message, final PersistentDataProvider persistentProvider) {
         log.trace("{}: handleRecoveryMessage: {}", context.getId(), message);
 
         anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
@@ -105,7 +106,7 @@ class RaftActorRecoverySupport {
         }
     }
 
-    private void onRecoveredSnapshot(SnapshotOffer offer) {
+    private void onRecoveredSnapshot(final SnapshotOffer offer) {
         log.debug("{}: SnapshotOffer called.", context.getId());
 
         initRecoveryTimer();
@@ -136,11 +137,15 @@ class RaftActorRecoverySupport {
         context.setCommitIndex(snapshot.getLastAppliedIndex());
         context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
 
-        Stopwatch timer = Stopwatch.createStarted();
+        final Stopwatch timer = Stopwatch.createStarted();
 
         // Apply the snapshot to the actors state
-        if (!(snapshot.getState() instanceof EmptyState)) {
-            cohort.applyRecoverySnapshot(snapshot.getState());
+        final State snapshotState = snapshot.getState();
+        if (snapshotState.needsMigration()) {
+            hasMigratedDataRecovered = true;
+        }
+        if (!(snapshotState instanceof EmptyState)) {
+            cohort.applyRecoverySnapshot(snapshotState);
         }
 
         if (snapshot.getServerConfiguration() != null) {
@@ -149,11 +154,11 @@ class RaftActorRecoverySupport {
 
         timer.stop();
         log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
-                context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
-                replicatedLog().getSnapshotTerm(), replicatedLog().size());
+                context.getId(), timer, replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm(),
+                replicatedLog().size());
     }
 
-    private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+    private void onRecoveredJournalLogEntry(final ReplicatedLogEntry logEntry) {
         if (log.isDebugEnabled()) {
             log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(),
                     logEntry.getIndex(), logEntry.size());
@@ -174,7 +179,7 @@ class RaftActorRecoverySupport {
         }
     }
 
-    private void onRecoveredApplyLogEntries(long toIndex) {
+    private void onRecoveredApplyLogEntries(final long toIndex) {
         if (!context.getPersistenceProvider().isRecoveryApplicable()) {
             dataRecoveredWithPersistenceDisabled = true;
             return;
@@ -206,7 +211,7 @@ class RaftActorRecoverySupport {
         context.setCommitIndex(lastApplied);
     }
 
-    private void onDeleteEntries(DeleteEntries deleteEntries) {
+    private void onDeleteEntries(final DeleteEntries deleteEntries) {
         if (context.getPersistenceProvider().isRecoveryApplicable()) {
             replicatedLog().removeFrom(deleteEntries.getFromIndex());
         } else {
@@ -214,7 +219,7 @@ class RaftActorRecoverySupport {
         }
     }
 
-    private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
+    private void batchRecoveredLogEntry(final ReplicatedLogEntry logEntry) {
         initRecoveryTimer();
 
         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
@@ -236,7 +241,7 @@ class RaftActorRecoverySupport {
         currentRecoveryBatchCount = 0;
     }
 
-    private void onRecoveryCompletedMessage(PersistentDataProvider persistentProvider) {
+    private void onRecoveryCompletedMessage(final PersistentDataProvider persistentProvider) {
         if (currentRecoveryBatchCount > 0) {
             endCurrentLogRecoveryBatch();
         }
@@ -248,10 +253,10 @@ class RaftActorRecoverySupport {
             recoveryTimer = null;
         }
 
-        log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " + "Persistence Id =  "
-                  + context.getId() + " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, "
-                  + "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
-                 replicatedLog().getSnapshotTerm(), replicatedLog().size());
+        log.info("{}: Recovery completed {} - Switching actor to Follower - last log index = {}, last log term = {}, "
+                + "snapshot index = {}, snapshot term = {}, journal size = {}", context.getId(), recoveryTime,
+                replicatedLog().lastIndex(), replicatedLog().lastTerm(), replicatedLog().getSnapshotIndex(),
+                replicatedLog().getSnapshotTerm(), replicatedLog().size());
 
         if (dataRecoveredWithPersistenceDisabled
                 || hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
@@ -284,19 +289,19 @@ class RaftActorRecoverySupport {
         }
     }
 
-    private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry) {
+    private static boolean isServerConfigurationPayload(final ReplicatedLogEntry repLogEntry) {
         return repLogEntry.getData() instanceof ServerConfigurationPayload;
     }
 
-    private static boolean isPersistentPayload(ReplicatedLogEntry repLogEntry) {
+    private static boolean isPersistentPayload(final ReplicatedLogEntry repLogEntry) {
         return repLogEntry.getData() instanceof PersistentPayload;
     }
 
-    private static boolean isMigratedPayload(ReplicatedLogEntry repLogEntry) {
+    private static boolean isMigratedPayload(final ReplicatedLogEntry repLogEntry) {
         return isMigratedSerializable(repLogEntry.getData());
     }
 
-    private static boolean isMigratedSerializable(Object message) {
+    private static boolean isMigratedSerializable(final Object message) {
         return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated();
     }
 }