- if(persistence().isRecoveryApplicable()) {
- if (message instanceof SnapshotOffer) {
- onRecoveredSnapshot((SnapshotOffer) message);
- } else if (message instanceof ReplicatedLogEntry) {
- onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
- } else if (message instanceof ApplyLogEntries) {
- // Handle this message for backwards compatibility with pre-Lithium versions.
- onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
- } else if (message instanceof ApplyJournalEntries) {
- onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
- } else if (message instanceof DeleteEntries) {
- replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
- } else if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
- ((UpdateElectionTerm) message).getVotedFor());
- } else if (message instanceof RecoveryCompleted) {
- onRecoveryCompletedMessage();
- }
- } else {
- if (message instanceof RecoveryCompleted) {
- // Delete all the messages from the akka journal so that we do not end up with consistency issues
- // Note I am not using the dataPersistenceProvider and directly using the akka api here
- deleteMessages(lastSequenceNr());
-
- // Delete all the akka snapshots as they will not be needed
- deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
-
- onRecoveryComplete();
-
- initializeBehavior();
- }
- }
- }
-
- private void onRecoveredSnapshot(SnapshotOffer offer) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: SnapshotOffer called..", persistenceId());
- }
-
- initRecoveryTimer();
-
- Snapshot snapshot = (Snapshot) offer.snapshot();
-
- // Create a replicated log with the snapshot information
- // The replicated log can be used later on to retrieve this snapshot
- // when we need to install it on a peer
- replicatedLog = new ReplicatedLogImpl(snapshot);
-
- context.setReplicatedLog(replicatedLog);
- context.setLastApplied(snapshot.getLastAppliedIndex());
- context.setCommitIndex(snapshot.getLastAppliedIndex());
-
- Stopwatch timer = Stopwatch.createStarted();
-
- // Apply the snapshot to the actors state
- applyRecoverySnapshot(snapshot.getState());
-
- timer.stop();
- LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
- replicatedLog.size(), persistenceId(), timer.toString(),
- replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
- }
-
- private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
- }
-
- replicatedLog.append(logEntry);
- }
-
- private void onRecoveredApplyLogEntries(long toIndex) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
- persistenceId(), context.getLastApplied() + 1, toIndex);