import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
import org.slf4j.Logger;
}
private void onRecoveredSnapshot(SnapshotOffer offer) {
- log.debug("{}: SnapshotOffer called..", context.getId());
+ log.debug("{}: SnapshotOffer called.", context.getId());
initRecoveryTimer();
- Snapshot snapshot = (Snapshot) offer.snapshot();
+ Object snapshotObj = offer.snapshot();
+ Snapshot snapshot;
+ if (snapshotObj instanceof org.opendaylight.controller.cluster.raft.Snapshot) {
+ org.opendaylight.controller.cluster.raft.Snapshot legacy =
+ (org.opendaylight.controller.cluster.raft.Snapshot)snapshotObj;
+ snapshot = Snapshot.create(cohort.deserializePreCarbonSnapshot(legacy.getState()),
+ legacy.getUnAppliedEntries(), legacy.getLastIndex(), legacy.getLastTerm(),
+ legacy.getLastAppliedIndex(), legacy.getLastAppliedTerm(),
+ legacy.getElectionTerm(), legacy.getElectionVotedFor(), legacy.getServerConfiguration());
+ hasMigratedDataRecovered = true;
+ } else {
+ snapshot = (Snapshot) offer.snapshot();
+ }
for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
if (isMigratedPayload(entry)) {
// We may have just transitioned to disabled and have a snapshot containing state data and/or log
// entries - we don't want to preserve these, only the server config and election term info.
- snapshot = Snapshot.create(new byte[0], Collections.emptyList(), -1, -1, -1, -1,
+ snapshot = Snapshot.create(
+ EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration());
}
Stopwatch timer = Stopwatch.createStarted();
// Apply the snapshot to the actors state
- cohort.applyRecoverySnapshot(snapshot.getState());
+ if (!(snapshot.getState() instanceof EmptyState)) {
+ cohort.applyRecoverySnapshot(snapshot.getState());
+ }
if (snapshot.getServerConfiguration() != null) {
context.updatePeerIds(snapshot.getServerConfiguration());
// messages. Either way, we persist a snapshot and delete all the messages from the akka journal
// to clean out unwanted messages.
- Snapshot snapshot = Snapshot.create(new byte[0], Collections.<ReplicatedLogEntry>emptyList(),
+ Snapshot snapshot = Snapshot.create(
+ EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
-1, -1, -1, -1,
context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
context.getPeerServerInfo(true));