import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.base.Stopwatch;
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.slf4j.Logger;
/**
*/
class RaftActorRecoverySupport {
private final RaftActorContext context;
- private final RaftActorBehavior currentBehavior;
private final RaftActorRecoveryCohort cohort;
private int currentRecoveryBatchCount;
private boolean dataRecoveredWithPersistenceDisabled;
+ private boolean anyDataRecovered;
private Stopwatch recoveryTimer;
private final Logger log;
- RaftActorRecoverySupport(RaftActorContext context, RaftActorBehavior currentBehavior,
- RaftActorRecoveryCohort cohort) {
+ RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
this.context = context;
- this.currentBehavior = currentBehavior;
this.cohort = cohort;
this.log = context.getLogger();
}
boolean handleRecoveryMessage(Object message, PersistentDataProvider persistentProvider) {
- log.trace("handleRecoveryMessage: {}", message);
+ log.trace("{}: handleRecoveryMessage: {}", context.getId(), message);
+
+ anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
boolean recoveryComplete = false;
DataPersistenceProvider persistence = context.getPersistenceProvider();
onRecoveredSnapshot((SnapshotOffer) message);
} else if (message instanceof ReplicatedLogEntry) {
onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
- } else if (message instanceof ApplyLogEntries) {
- // Handle this message for backwards compatibility with pre-Lithium versions.
- onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
} else if (message instanceof ApplyJournalEntries) {
onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
} else if (message instanceof DeleteEntries) {
replicatedLog().removeFrom(((org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) message).getFromIndex());
} else if (message instanceof RecoveryCompleted) {
onRecoveryCompletedMessage();
+ possiblyRestoreFromSnapshot();
recoveryComplete = true;
}
} else if (message instanceof RecoveryCompleted) {
// Delete all the akka snapshots as they will not be needed
persistentProvider.deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(),
- scala.Long.MaxValue()));
+ scala.Long.MaxValue(), 0L, 0L));
// Since we cleaned out the journal, we need to re-write the current election info.
context.getTermInformation().updateAndPersist(context.getTermInformation().getCurrentTerm(),
context.getTermInformation().getVotedFor());
}
+
+ onRecoveryCompletedMessage();
+ possiblyRestoreFromSnapshot();
} else {
boolean isServerConfigPayload = false;
if(message instanceof ReplicatedLogEntry){
return recoveryComplete;
}
+ private void possiblyRestoreFromSnapshot() {
+ byte[] restoreFromSnapshot = cohort.getRestoreFromSnapshot();
+ if(restoreFromSnapshot == null) {
+ return;
+ }
+
+ if(anyDataRecovered) {
+ log.warn("{}: The provided restore snapshot was not applied because the persistence store is not empty",
+ context.getId());
+ return;
+ }
+
+ try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(restoreFromSnapshot))) {
+ Snapshot snapshot = (Snapshot) ois.readObject();
+
+ log.debug("{}: Deserialized restore snapshot: {}", context.getId(), snapshot);
+
+ context.getSnapshotManager().apply(new ApplySnapshot(snapshot));
+ } catch(Exception e) {
+ log.error("{}: Error deserializing snapshot restore", context.getId(), e);
+ }
+ }
+
private ReplicatedLog replicatedLog() {
return context.getReplicatedLog();
}
// The replicated log can be used later on to retrieve this snapshot
// when we need to install it on a peer
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context));
context.setLastApplied(snapshot.getLastAppliedIndex());
context.setCommitIndex(snapshot.getLastAppliedIndex());
context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
// Apply the snapshot to the actors state
cohort.applyRecoverySnapshot(snapshot.getState());
+ if (snapshot.getServerConfiguration() != null) {
+ context.updatePeerIds(snapshot.getServerConfiguration());
+ }
+
timer.stop();
log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
batchRecoveredLogEntry(logEntry);
} else {
// Shouldn't happen but cover it anyway.
- log.error("Log entry not found for index {}", i);
+ log.error("{}: Log entry not found for index {}", context.getId(), i);
break;
}
}
replicatedLog().getSnapshotTerm(), replicatedLog().size());
}
- private boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry){
+ private static boolean isServerConfigurationPayload(ReplicatedLogEntry repLogEntry){
return (repLogEntry.getData() instanceof ServerConfigurationPayload);
}
}