X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorRecoverySupport.java;h=17e3343804886b8ea9227d303cc6d75a25e59d06;hp=f0a7066d85eaba13ddd11d6f79876858c0395f58;hb=aafb8cb044e992dd784d1f4f66508599cc4cd588;hpb=b0f8283587b5cc8573d29f66219cbe7f70e21e1b diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java index f0a7066d85..17e3343804 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -10,15 +10,15 @@ package org.opendaylight.controller.cluster.raft; import akka.persistence.RecoveryCompleted; import akka.persistence.SnapshotOffer; import com.google.common.base.Stopwatch; -import java.io.ByteArrayInputStream; -import java.io.ObjectInputStream; import java.util.Collections; import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries; +import org.opendaylight.controller.cluster.raft.persisted.EmptyState; import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload; import org.slf4j.Logger; @@ -79,7 +79,7 @@ class RaftActorRecoverySupport { @SuppressWarnings("checkstyle:IllegalCatch") private void possiblyRestoreFromSnapshot() { - byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot(); + Snapshot restoreFromSnapshot = cohort.getRestoreFromSnapshot(); if (restoreFromSnapshot == null) { return; } @@ -90,15 +90,9 @@ class RaftActorRecoverySupport { return; } - try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) { - Snapshot snapshot = (Snapshot) ois.readObject(); + log.debug("{}: Restore snapshot: {}", context.getId(), restoreFromSnapshot); - log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot); - - context.getSnapshotManager().apply(new ApplySnapshot(snapshot)); - } catch (Exception e) { - log.error("{}: Error deserializing snapshot restore", context.getId(), e); - } + context.getSnapshotManager().apply(new ApplySnapshot(restoreFromSnapshot)); } private ReplicatedLog replicatedLog() { @@ -112,11 +106,23 @@ class RaftActorRecoverySupport { } private void onRecoveredSnapshot(SnapshotOffer offer) { - log.debug("{}: SnapshotOffer called..", context.getId()); + log.debug("{}: SnapshotOffer called.", context.getId()); initRecoveryTimer(); - Snapshot snapshot = (Snapshot) offer.snapshot(); + Object snapshotObj = offer.snapshot(); + Snapshot snapshot; + if (snapshotObj instanceof org.opendaylight.controller.cluster.raft.Snapshot) { + org.opendaylight.controller.cluster.raft.Snapshot legacy = + (org.opendaylight.controller.cluster.raft.Snapshot)snapshotObj; + snapshot = Snapshot.create(cohort.deserializePreCarbonSnapshot(legacy.getState()), + legacy.getUnAppliedEntries(), legacy.getLastIndex(), legacy.getLastTerm(), + legacy.getLastAppliedIndex(), legacy.getLastAppliedTerm(), + legacy.getElectionTerm(), legacy.getElectionVotedFor(), legacy.getServerConfiguration()); + hasMigratedDataRecovered = true; + } else { + snapshot = (Snapshot) offer.snapshot(); + } for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) { if (isMigratedPayload(entry)) { @@ -128,7 +134,8 @@ class RaftActorRecoverySupport { // We may have just transitioned to disabled and have a snapshot containing state data and/or log // entries - we don't want to preserve these, only the server config and election term info. - snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1, + snapshot = Snapshot.create( + EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1, snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration()); } @@ -144,7 +151,9 @@ class RaftActorRecoverySupport { Stopwatch timer = Stopwatch.createStarted(); // Apply the snapshot to the actors state - cohort.applyRecoverySnapshot(snapshot.getState()); + if (!(snapshot.getState() instanceof EmptyState)) { + cohort.applyRecoverySnapshot(snapshot.getState()); + } if (snapshot.getServerConfiguration() != null) { context.updatePeerIds(snapshot.getServerConfiguration()); @@ -273,7 +282,8 @@ class RaftActorRecoverySupport { // messages. Either way, we persist a snapshot and delete all the messages from the akka journal // to clean out unwanted messages. - Snapshot snapshot = Snapshot.create(new byte[0], Collections.emptyList(), + Snapshot snapshot = Snapshot.create( + EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1, context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(), context.getPeerServerInfo(true));