Bug 7521: Convert DatastoreSnapshot.ShardSnapshot to store Snapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupport.java
index f0a7066d85eaba13ddd11d6f79876858c0395f58..17e3343804886b8ea9227d303cc6d75a25e59d06 100644 (file)
@@ -10,15 +10,15 @@ package org.opendaylight.controller.cluster.raft;
 import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotOffer;
 import com.google.common.base.Stopwatch;
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
 import java.util.Collections;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
 import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
 import org.slf4j.Logger;
@@ -79,7 +79,7 @@ class RaftActorRecoverySupport {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void possiblyRestoreFromSnapshot() {
-        byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot();
+        Snapshot restoreFromSnapshot = cohort.getRestoreFromSnapshot();
         if (restoreFromSnapshot == null) {
             return;
         }
@@ -90,15 +90,9 @@ class RaftActorRecoverySupport {
             return;
         }
 
-        try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) {
-            Snapshot snapshot = (Snapshot) ois.readObject();
+        log.debug("{}: Restore snapshot: {}", context.getId(), restoreFromSnapshot);
 
-            log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot);
-
-            context.getSnapshotManager().apply(new ApplySnapshot(snapshot));
-        } catch (Exception e) {
-            log.error("{}: Error deserializing snapshot restore", context.getId(), e);
-        }
+        context.getSnapshotManager().apply(new ApplySnapshot(restoreFromSnapshot));
     }
 
     private ReplicatedLog replicatedLog() {
@@ -112,11 +106,23 @@ class RaftActorRecoverySupport {
     }
 
     private void onRecoveredSnapshot(SnapshotOffer offer) {
-        log.debug("{}: SnapshotOffer called..", context.getId());
+        log.debug("{}: SnapshotOffer called.", context.getId());
 
         initRecoveryTimer();
 
-        Snapshot snapshot = (Snapshot) offer.snapshot();
+        Object snapshotObj = offer.snapshot();
+        Snapshot snapshot;
+        if (snapshotObj instanceof org.opendaylight.controller.cluster.raft.Snapshot) {
+            org.opendaylight.controller.cluster.raft.Snapshot legacy =
+                    (org.opendaylight.controller.cluster.raft.Snapshot)snapshotObj;
+            snapshot = Snapshot.create(cohort.deserializePreCarbonSnapshot(legacy.getState()),
+                    legacy.getUnAppliedEntries(), legacy.getLastIndex(), legacy.getLastTerm(),
+                    legacy.getLastAppliedIndex(), legacy.getLastAppliedTerm(),
+                    legacy.getElectionTerm(), legacy.getElectionVotedFor(), legacy.getServerConfiguration());
+            hasMigratedDataRecovered = true;
+        } else {
+            snapshot = (Snapshot) offer.snapshot();
+        }
 
         for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
             if (isMigratedPayload(entry)) {
@@ -128,7 +134,8 @@ class RaftActorRecoverySupport {
             // We may have just transitioned to disabled and have a snapshot containing state data and/or log
             // entries - we don't want to preserve these, only the server config and election term info.
 
-            snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1,
+            snapshot = Snapshot.create(
+                    EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
                     snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration());
         }
 
@@ -144,7 +151,9 @@ class RaftActorRecoverySupport {
         Stopwatch timer = Stopwatch.createStarted();
 
         // Apply the snapshot to the actors state
-        cohort.applyRecoverySnapshot(snapshot.getState());
+        if (!(snapshot.getState() instanceof EmptyState)) {
+            cohort.applyRecoverySnapshot(snapshot.getState());
+        }
 
         if (snapshot.getServerConfiguration() != null) {
             context.updatePeerIds(snapshot.getServerConfiguration());
@@ -273,7 +282,8 @@ class RaftActorRecoverySupport {
             // messages. Either way, we persist a snapshot and delete all the messages from the akka journal
             // to clean out unwanted messages.
 
-            Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+            Snapshot snapshot = Snapshot.create(
+                    EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
                     -1, -1, -1, -1,
                     context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
                     context.getPeerServerInfo(true));