- // Create a replicated log with the snapshot information
- // The replicated log can be used later on to retrieve this snapshot
- // when we need to install it on a peer
-
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
- currentBehavior));
- context.setLastApplied(snapshot.getLastAppliedIndex());
- context.setCommitIndex(snapshot.getLastAppliedIndex());
-
- Stopwatch timer = Stopwatch.createStarted();
-
- // Apply the snapshot to the actors state
- applyRecoverySnapshot(snapshot.getState());
-
- timer.stop();
- LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
- replicatedLog().size(), persistenceId(), timer.toString(),
- replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm());
- }
-
- private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
- }
-
- replicatedLog().append(logEntry);
- }
-
- private void onRecoveredApplyLogEntries(long toIndex) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
- persistenceId(), context.getLastApplied() + 1, toIndex);
- }
-
- for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
- batchRecoveredLogEntry(replicatedLog().get(i));
- }
-
- context.setLastApplied(toIndex);
- context.setCommitIndex(toIndex);
- }
-
- private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
- initRecoveryTimer();
-
- int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
- if(currentRecoveryBatchCount == 0) {
- startLogRecoveryBatch(batchSize);
- }
-
- appendRecoveredLogEntry(logEntry.getData());
-
- if(++currentRecoveryBatchCount >= batchSize) {
- endCurrentLogRecoveryBatch();