+ private void onRecoveredSnapshot(SnapshotOffer offer) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("SnapshotOffer called..");
+ }
+
+ initRecoveryTimer();
+
+ Snapshot snapshot = (Snapshot) offer.snapshot();
+
+ // Create a replicated log with the snapshot information
+ // The replicated log can be used later on to retrieve this snapshot
+ // when we need to install it on a peer
+ replicatedLog = new ReplicatedLogImpl(snapshot);
+
+ context.setReplicatedLog(replicatedLog);
+ context.setLastApplied(snapshot.getLastAppliedIndex());
+ context.setCommitIndex(snapshot.getLastAppliedIndex());
+
+ Stopwatch timer = new Stopwatch();
+ timer.start();
+
+ // Apply the snapshot to the actors state
+ applyRecoverySnapshot(ByteString.copyFrom(snapshot.getState()));
+
+ timer.stop();
+ LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
+ replicatedLog.size(), persistenceId(), timer.toString(),
+ replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+ }
+
+ private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+ }
+
+ replicatedLog.append(logEntry);
+ }
+
+ private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
+ context.getLastApplied() + 1, ale.getToIndex());
+ }
+
+ for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+ batchRecoveredLogEntry(replicatedLog.get(i));
+ }
+
+ context.setLastApplied(ale.getToIndex());
+ context.setCommitIndex(ale.getToIndex());
+ }
+
+ private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
+ initRecoveryTimer();
+
+ int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
+ if(currentRecoveryBatchCount == 0) {
+ startLogRecoveryBatch(batchSize);
+ }
+
+ appendRecoveredLogEntry(logEntry.getData());
+
+ if(++currentRecoveryBatchCount >= batchSize) {
+ endCurrentLogRecoveryBatch();
+ }
+ }
+
+ private void endCurrentLogRecoveryBatch() {
+ applyCurrentLogRecoveryBatch();
+ currentRecoveryBatchCount = 0;
+ }
+
+ private void onRecoveryCompletedMessage() {
+ if(currentRecoveryBatchCount > 0) {
+ endCurrentLogRecoveryBatch();
+ }
+
+ onRecoveryComplete();
+
+ String recoveryTime = "";
+ if(recoveryTimer != null) {
+ recoveryTimer.stop();
+ recoveryTime = " in " + recoveryTimer.toString();
+ recoveryTimer = null;
+ }
+
+ LOG.info(
+ "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
+ "Persistence Id = " + persistenceId() +
+ " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
+ "journal-size={}",
+ replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+ replicatedLog.snapshotTerm, replicatedLog.size());
+
+ RaftActorBehavior oldBehavior = currentBehavior;
+ currentBehavior = new Follower(context);
+ handleBehaviorChange(oldBehavior, currentBehavior);
+ }
+
+ @Override public void handleCommand(Object message) {