Bug 7521: Convert Snapshot to store a State instance
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupport.java
index 5e4e6571a10d3d1f998796849a9be7486d86de75..df207670d9fb2625219a76729af2b4d0bea0b4c9 100644 (file)
@@ -18,8 +18,10 @@ import org.opendaylight.controller.cluster.PersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
 import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
 import org.slf4j.Logger;
@@ -113,11 +115,23 @@ class RaftActorRecoverySupport {
     }
 
     private void onRecoveredSnapshot(SnapshotOffer offer) {
-        log.debug("{}: SnapshotOffer called..", context.getId());
+        log.debug("{}: SnapshotOffer called.", context.getId());
 
         initRecoveryTimer();
 
-        Snapshot snapshot = (Snapshot) offer.snapshot();
+        Object snapshotObj = offer.snapshot();
+        Snapshot snapshot;
+        if (snapshotObj instanceof org.opendaylight.controller.cluster.raft.Snapshot) {
+            org.opendaylight.controller.cluster.raft.Snapshot legacy =
+                    (org.opendaylight.controller.cluster.raft.Snapshot)snapshotObj;
+            snapshot = Snapshot.create(cohort.deserializePreCarbonSnapshot(legacy.getState()),
+                    legacy.getUnAppliedEntries(), legacy.getLastIndex(), legacy.getLastTerm(),
+                    legacy.getLastAppliedIndex(), legacy.getLastAppliedTerm(),
+                    legacy.getElectionTerm(), legacy.getElectionVotedFor(), legacy.getServerConfiguration());
+            hasMigratedDataRecovered = true;
+        } else {
+            snapshot = (Snapshot) offer.snapshot();
+        }
 
         for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
             if (isMigratedPayload(entry)) {
@@ -129,7 +143,8 @@ class RaftActorRecoverySupport {
             // We may have just transitioned to disabled and have a snapshot containing state data and/or log
             // entries - we don't want to preserve these, only the server config and election term info.
 
-            snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1,
+            snapshot = Snapshot.create(
+                    EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
                     snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration());
         }
 
@@ -145,7 +160,9 @@ class RaftActorRecoverySupport {
         Stopwatch timer = Stopwatch.createStarted();
 
         // Apply the snapshot to the actors state
-        cohort.applyRecoverySnapshot(snapshot.getState());
+        if (!(snapshot.getState() instanceof EmptyState)) {
+            cohort.applyRecoverySnapshot(snapshot.getState());
+        }
 
         if (snapshot.getServerConfiguration() != null) {
             context.updatePeerIds(snapshot.getServerConfiguration());
@@ -274,7 +291,8 @@ class RaftActorRecoverySupport {
             // messages. Either way, we persist a snapshot and delete all the messages from the akka journal
             // to clean out unwanted messages.
 
-            Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+            Snapshot snapshot = Snapshot.create(
+                    EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
                     -1, -1, -1, -1,
                     context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
                     context.getPeerServerInfo(true));