import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.base.Stopwatch;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
private final RaftActorRecoveryCohort cohort;
private int currentRecoveryBatchCount;
+ private boolean dataRecoveredWithPersistenceDisabled;
private Stopwatch recoveryTimer;
private final Logger log;
this.log = context.getLogger();
}
- boolean handleRecoveryMessage(Object message) {
+ boolean handleRecoveryMessage(Object message, PersistentDataProvider persistentProvider) {
+ log.trace("handleRecoveryMessage: {}", message);
+
boolean recoveryComplete = false;
- if(context.getPersistenceProvider().isRecoveryApplicable()) {
+ DataPersistenceProvider persistence = context.getPersistenceProvider();
+ if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm) {
+ // Handle this message for backwards compatibility with pre-Lithium versions.
+ org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm update =
+ (org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm)message;
+ context.getTermInformation().update(update.getCurrentTerm(), update.getVotedFor());
+ } else if (message instanceof UpdateElectionTerm) {
+ context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+ ((UpdateElectionTerm) message).getVotedFor());
+ } else if(persistence.isRecoveryApplicable()) {
if (message instanceof SnapshotOffer) {
onRecoveredSnapshot((SnapshotOffer) message);
} else if (message instanceof ReplicatedLogEntry) {
} else if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) {
// Handle this message for backwards compatibility with pre-Lithium versions.
replicatedLog().removeFrom(((org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries) message).getFromIndex());
- } else if (message instanceof org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm) {
- // Handle this message for backwards compatibility with pre-Lithium versions.
- org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm update =
- (org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm)message;
- context.getTermInformation().update(update.getCurrentTerm(), update.getVotedFor());
- } else if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
- ((UpdateElectionTerm) message).getVotedFor());
} else if (message instanceof RecoveryCompleted) {
onRecoveryCompletedMessage();
recoveryComplete = true;
}
} else if (message instanceof RecoveryCompleted) {
recoveryComplete = true;
+
+ if(dataRecoveredWithPersistenceDisabled) {
+ // Data persistence is disabled but we recovered some data entries so we must have just
+ // transitioned to disabled or a persistence backup was restored. Either way, delete all the
+ // messages from the akka journal for efficiency and so that we do not end up with consistency
+ // issues in case persistence is -re-enabled.
+ persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber());
+
+ // Delete all the akka snapshots as they will not be needed
+ persistentProvider.deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(),
+ scala.Long.MaxValue()));
+
+ // Since we cleaned out the journal, we need to re-write the current election info.
+ context.getTermInformation().updateAndPersist(context.getTermInformation().getCurrentTerm(),
+ context.getTermInformation().getVotedFor());
+ }
+ } else {
+ dataRecoveredWithPersistenceDisabled = true;
}
return recoveryComplete;