X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActorRecoverySupport.java;h=a31bf4bf41002311c28352cfac8bdfa2dd768c55;hb=6751227ff9443018d75e3a99da5693230d23d82b;hp=d0217a6bc0d7223c0976609189a939050bae5e49;hpb=c9943f5bc72d4cde9356d3bd4cf73d36f4b2f754;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java index d0217a6bc0..a31bf4bf41 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -10,24 +10,24 @@ package org.opendaylight.controller.cluster.raft; import akka.persistence.RecoveryCompleted; import akka.persistence.SnapshotOffer; import com.google.common.base.Stopwatch; -import java.io.ByteArrayInputStream; -import java.io.ObjectInputStream; import java.util.Collections; import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries; +import org.opendaylight.controller.cluster.raft.persisted.EmptyState; import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable; import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload; import org.slf4j.Logger; + /** * Support class that handles persistence recovery for a RaftActor. * * @author Thomas Pantelis */ - class RaftActorRecoverySupport { private final RaftActorContext context; private final RaftActorRecoveryCohort cohort; @@ -77,8 +77,9 @@ class RaftActorRecoverySupport { return recoveryComplete; } + @SuppressWarnings("checkstyle:IllegalCatch") private void possiblyRestoreFromSnapshot() { - byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot(); + Snapshot restoreFromSnapshot = cohort.getRestoreFromSnapshot(); if (restoreFromSnapshot == null) { return; } @@ -89,15 +90,9 @@ class RaftActorRecoverySupport { return; } - try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) { - Snapshot snapshot = (Snapshot) ois.readObject(); - - log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot); + log.debug("{}: Restore snapshot: {}", context.getId(), restoreFromSnapshot); - context.getSnapshotManager().apply(new ApplySnapshot(snapshot)); - } catch (Exception e) { - log.error("{}: Error deserializing snapshot restore", context.getId(), e); - } + context.getSnapshotManager().apply(new ApplySnapshot(restoreFromSnapshot)); } private ReplicatedLog replicatedLog() { @@ -111,11 +106,23 @@ class RaftActorRecoverySupport { } private void onRecoveredSnapshot(SnapshotOffer offer) { - log.debug("{}: SnapshotOffer called..", context.getId()); + log.debug("{}: SnapshotOffer called.", context.getId()); initRecoveryTimer(); - Snapshot snapshot = (Snapshot) offer.snapshot(); + Object snapshotObj = offer.snapshot(); + Snapshot snapshot; + if (snapshotObj instanceof org.opendaylight.controller.cluster.raft.Snapshot) { + org.opendaylight.controller.cluster.raft.Snapshot legacy = + (org.opendaylight.controller.cluster.raft.Snapshot)snapshotObj; + snapshot = Snapshot.create(cohort.deserializePreCarbonSnapshot(legacy.getState()), + legacy.getUnAppliedEntries(), legacy.getLastIndex(), legacy.getLastTerm(), + legacy.getLastAppliedIndex(), legacy.getLastAppliedTerm(), + legacy.getElectionTerm(), legacy.getElectionVotedFor(), legacy.getServerConfiguration()); + hasMigratedDataRecovered = true; + } else { + snapshot = (Snapshot) offer.snapshot(); + } for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) { if (isMigratedPayload(entry)) { @@ -127,7 +134,8 @@ class RaftActorRecoverySupport { // We may have just transitioned to disabled and have a snapshot containing state data and/or log // entries - we don't want to preserve these, only the server config and election term info. - snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1, + snapshot = Snapshot.create( + EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1, snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration()); } @@ -143,14 +151,12 @@ class RaftActorRecoverySupport { Stopwatch timer = Stopwatch.createStarted(); // Apply the snapshot to the actors state - cohort.applyRecoverySnapshot(snapshot.getState()); + if (!(snapshot.getState() instanceof EmptyState)) { + cohort.applyRecoverySnapshot(snapshot.getState()); + } if (snapshot.getServerConfiguration() != null) { context.updatePeerIds(snapshot.getServerConfiguration()); - - if (isMigratedSerializable(snapshot.getServerConfiguration())) { - hasMigratedDataRecovered = true; - } } timer.stop(); @@ -272,7 +278,8 @@ class RaftActorRecoverySupport { // messages. Either way, we persist a snapshot and delete all the messages from the akka journal // to clean out unwanted messages. - Snapshot snapshot = Snapshot.create(new byte[0], Collections.emptyList(), + Snapshot snapshot = Snapshot.create( + EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1, context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(), context.getPeerServerInfo(true));