import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotOffer;
import com.google.common.base.Stopwatch;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
import java.util.Collections;
import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
@SuppressWarnings("checkstyle:IllegalCatch")
private void possiblyRestoreFromSnapshot() {
- byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot();
+ Snapshot restoreFromSnapshot = cohort.getRestoreFromSnapshot();
if (restoreFromSnapshot == null) {
return;
}
return;
}
- try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) {
- Snapshot snapshot = (Snapshot) ois.readObject();
+ log.debug("{}: Restore snapshot: {}", context.getId(), restoreFromSnapshot);
- log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot);
-
- context.getSnapshotManager().apply(new ApplySnapshot(snapshot));
- } catch (RuntimeException | ClassNotFoundException | IOException e) {
- log.error("{}: Error deserializing snapshot restore", context.getId(), e);
- }
+ context.getSnapshotManager().apply(new ApplySnapshot(restoreFromSnapshot));
}
private ReplicatedLog replicatedLog() {
initRecoveryTimer();
- Object snapshotObj = offer.snapshot();
- Snapshot snapshot;
- if (snapshotObj instanceof org.opendaylight.controller.cluster.raft.Snapshot) {
- org.opendaylight.controller.cluster.raft.Snapshot legacy =
- (org.opendaylight.controller.cluster.raft.Snapshot)snapshotObj;
- snapshot = Snapshot.create(cohort.deserializePreCarbonSnapshot(legacy.getState()),
- legacy.getUnAppliedEntries(), legacy.getLastIndex(), legacy.getLastTerm(),
- legacy.getLastAppliedIndex(), legacy.getLastAppliedTerm(),
- legacy.getElectionTerm(), legacy.getElectionVotedFor(), legacy.getServerConfiguration());
- hasMigratedDataRecovered = true;
- } else {
- snapshot = (Snapshot) offer.snapshot();
- }
+ Snapshot snapshot = (Snapshot) offer.snapshot();
for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
if (isMigratedPayload(entry)) {
if (snapshot.getServerConfiguration() != null) {
context.updatePeerIds(snapshot.getServerConfiguration());
-
- if (isMigratedSerializable(snapshot.getServerConfiguration())) {
- hasMigratedDataRecovered = true;
- }
}
timer.stop();