import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.base.Stopwatch;
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
private int currentRecoveryBatchCount;
private boolean dataRecoveredWithPersistenceDisabled;
+ private boolean anyDataRecovered;
private Stopwatch recoveryTimer;
private final Logger log;
}
boolean handleRecoveryMessage(Object message, PersistentDataProvider persistentProvider) {
- log.trace("handleRecoveryMessage: {}", message);
+ log.trace("{}: handleRecoveryMessage: {}", context.getId(), message);
+
+ anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
boolean recoveryComplete = false;
DataPersistenceProvider persistence = context.getPersistenceProvider();
replicatedLog().removeFrom(((org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) message).getFromIndex());
} else if (message instanceof RecoveryCompleted) {
onRecoveryCompletedMessage();
+ possiblyRestoreFromSnapshot();
recoveryComplete = true;
}
} else if (message instanceof RecoveryCompleted) {
// Delete all the akka snapshots as they will not be needed
persistentProvider.deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(),
- scala.Long.MaxValue()));
+ scala.Long.MaxValue(), 0L, 0L));
// Since we cleaned out the journal, we need to re-write the current election info.
context.getTermInformation().updateAndPersist(context.getTermInformation().getCurrentTerm(),
context.getTermInformation().getVotedFor());
}
+
+ possiblyRestoreFromSnapshot();
} else {
boolean isServerConfigPayload = false;
if(message instanceof ReplicatedLogEntry){
return recoveryComplete;
}
+ private void possiblyRestoreFromSnapshot() {
+ byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot();
+ if(restoreFromSnapshot == null) {
+ return;
+ }
+
+ if(anyDataRecovered) {
+ log.warn("{}: The provided restore snapshot was not applied because the persistence store is not empty",
+ context.getId());
+ return;
+ }
+
+ try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) {
+ Snapshot snapshot = (Snapshot) ois.readObject();
+
+ log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot);
+
+ context.getSnapshotManager().apply(new ApplySnapshot(snapshot));
+ } catch(Exception e) {
+ log.error("{}: Error deserializing snapshot restore", context.getId(), e);
+ }
+ }
+
private ReplicatedLog replicatedLog() {
return context.getReplicatedLog();
}
// Apply the snapshot to the actors state
cohort.applyRecoverySnapshot(snapshot.getState());
+ if (snapshot.getServerConfiguration() != null) {
+ context.updatePeerIds(snapshot.getServerConfiguration());
+ }
+
timer.stop();
log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
batchRecoveredLogEntry(logEntry);
} else {
// Shouldn't happen but cover it anyway.
- log.error("Log entry not found for index {}", i);
+ log.error("{}: Log entry not found for index {}", context.getId(), i);
break;
}
}
replicatedLog().getSnapshotTerm(), replicatedLog().size());
}
- private boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry){
+ private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry){
return (repLogEntry.getData() instanceof ServerConfigurationPayload);
}
}