- public void onReceiveRecover(Object message) {
- if (message instanceof SnapshotOffer) {
- onRecoveredSnapshot((SnapshotOffer)message);
- } else if (message instanceof ReplicatedLogEntry) {
- onRecoveredJournalLogEntry((ReplicatedLogEntry)message);
- } else if (message instanceof ApplyLogEntries) {
- onRecoveredApplyLogEntries((ApplyLogEntries)message);
- } else if (message instanceof DeleteEntries) {
- replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
- } else if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
- ((UpdateElectionTerm) message).getVotedFor());
- } else if (message instanceof RecoveryCompleted) {
- onRecoveryCompletedMessage();
+ public void handleRecover(Object message) {
+ if(persistence().isRecoveryApplicable()) {
+ if (message instanceof SnapshotOffer) {
+ onRecoveredSnapshot((SnapshotOffer) message);
+ } else if (message instanceof ReplicatedLogEntry) {
+ onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
+ } else if (message instanceof ApplyLogEntries) {
+ onRecoveredApplyLogEntries((ApplyLogEntries) message);
+ } else if (message instanceof DeleteEntries) {
+ replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
+ } else if (message instanceof UpdateElectionTerm) {
+ context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+ ((UpdateElectionTerm) message).getVotedFor());
+ } else if (message instanceof RecoveryCompleted) {
+ onRecoveryCompletedMessage();
+ }
+ } else {
+ if (message instanceof RecoveryCompleted) {
+ // Delete all the messages from the akka journal so that we do not end up with consistency issues
+ // Note I am not using the dataPersistenceProvider and directly using the akka api here
+ deleteMessages(lastSequenceNr());
+
+ // Delete all the akka snapshots as they will not be needed
+ deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
+
+ onRecoveryComplete();
+ currentBehavior = new Follower(context);
+ onStateChanged();
+ }