import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotOffer;
import com.google.common.base.Stopwatch;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
import java.util.Collections;
import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
import org.slf4j.Logger;
@SuppressWarnings("checkstyle:IllegalCatch")
private void possiblyRestoreFromSnapshot() {
- byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot();
+ Snapshot restoreFromSnapshot = cohort.getRestoreFromSnapshot();
if (restoreFromSnapshot == null) {
return;
}
return;
}
- try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) {
- Snapshot snapshot = (Snapshot) ois.readObject();
+ log.debug("{}: Restore snapshot: {}", context.getId(), restoreFromSnapshot);
- log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot);
-
- context.getSnapshotManager().apply(new ApplySnapshot(snapshot));
- } catch (RuntimeException | ClassNotFoundException | IOException e) {
- log.error("{}: Error deserializing snapshot restore", context.getId(), e);
- }
+ context.getSnapshotManager().apply(new ApplySnapshot(restoreFromSnapshot));
}
private ReplicatedLog replicatedLog() {
}
private void onRecoveredSnapshot(SnapshotOffer offer) {
- log.debug("{}: SnapshotOffer called..", context.getId());
+ log.debug("{}: SnapshotOffer called.", context.getId());
initRecoveryTimer();
// We may have just transitioned to disabled and have a snapshot containing state data and/or log
// entries - we don't want to preserve these, only the server config and election term info.
- snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1,
+ snapshot = Snapshot.create(
+ EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration());
}
Stopwatch timer = Stopwatch.createStarted();
// Apply the snapshot to the actors state
- cohort.applyRecoverySnapshot(snapshot.getState());
+ if (!(snapshot.getState() instanceof EmptyState)) {
+ cohort.applyRecoverySnapshot(snapshot.getState());
+ }
if (snapshot.getServerConfiguration() != null) {
context.updatePeerIds(snapshot.getServerConfiguration());
-
- if (isMigratedSerializable(snapshot.getServerConfiguration())) {
- hasMigratedDataRecovered = true;
- }
}
timer.stop();
// messages. Either way, we persist a snapshot and delete all the messages from the akka journal
// to clean out unwanted messages.
- Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+ Snapshot snapshot = Snapshot.create(
+ EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
-1, -1, -1, -1,
context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
context.getPeerServerInfo(true));