Bug 7521: Add FileBackedOutputStream and use for snapshot chunking
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupport.java
index f0a7066d85eaba13ddd11d6f79876858c0395f58..df207670d9fb2625219a76729af2b4d0bea0b4c9 100644 (file)
@@ -11,14 +11,17 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.SnapshotOffer;
 import com.google.common.base.Stopwatch;
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.Collections;
 import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
 import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
 import org.slf4j.Logger;
@@ -96,7 +99,7 @@ class RaftActorRecoverySupport {
             log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot);
 
             context.getSnapshotManager().apply(new ApplySnapshot(snapshot));
-        } catch (Exception e) {
+        } catch (RuntimeException | ClassNotFoundException | IOException e) {
             log.error("{}: Error deserializing snapshot restore", context.getId(), e);
         }
     }
@@ -112,11 +115,23 @@ class RaftActorRecoverySupport {
     }
 
     private void onRecoveredSnapshot(SnapshotOffer offer) {
-        log.debug("{}: SnapshotOffer called..", context.getId());
+        log.debug("{}: SnapshotOffer called.", context.getId());
 
         initRecoveryTimer();
 
-        Snapshot snapshot = (Snapshot) offer.snapshot();
+        Object snapshotObj = offer.snapshot();
+        Snapshot snapshot;
+        if (snapshotObj instanceof org.opendaylight.controller.cluster.raft.Snapshot) {
+            org.opendaylight.controller.cluster.raft.Snapshot legacy =
+                    (org.opendaylight.controller.cluster.raft.Snapshot)snapshotObj;
+            snapshot = Snapshot.create(cohort.deserializePreCarbonSnapshot(legacy.getState()),
+                    legacy.getUnAppliedEntries(), legacy.getLastIndex(), legacy.getLastTerm(),
+                    legacy.getLastAppliedIndex(), legacy.getLastAppliedTerm(),
+                    legacy.getElectionTerm(), legacy.getElectionVotedFor(), legacy.getServerConfiguration());
+            hasMigratedDataRecovered = true;
+        } else {
+            snapshot = (Snapshot) offer.snapshot();
+        }
 
         for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
             if (isMigratedPayload(entry)) {
@@ -128,7 +143,8 @@ class RaftActorRecoverySupport {
             // We may have just transitioned to disabled and have a snapshot containing state data and/or log
             // entries - we don't want to preserve these, only the server config and election term info.
 
-            snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1,
+            snapshot = Snapshot.create(
+                    EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
                     snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration());
         }
 
@@ -144,7 +160,9 @@ class RaftActorRecoverySupport {
         Stopwatch timer = Stopwatch.createStarted();
 
         // Apply the snapshot to the actors state
-        cohort.applyRecoverySnapshot(snapshot.getState());
+        if (!(snapshot.getState() instanceof EmptyState)) {
+            cohort.applyRecoverySnapshot(snapshot.getState());
+        }
 
         if (snapshot.getServerConfiguration() != null) {
             context.updatePeerIds(snapshot.getServerConfiguration());
@@ -273,7 +291,8 @@ class RaftActorRecoverySupport {
             // messages. Either way, we persist a snapshot and delete all the messages from the akka journal
             // to clean out unwanted messages.
 
-            Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+            Snapshot snapshot = Snapshot.create(
+                    EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
                     -1, -1, -1, -1,
                     context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
                     context.getPeerServerInfo(true));