- } else if (message instanceof ReplicatedLogEntry) {
- replicatedLog.append((ReplicatedLogEntry) message);
- } else if (message instanceof DeleteEntries) {
- replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
- } else if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
- } else if (message instanceof RecoveryCompleted) {
- LOG.debug(
- "RecoveryCompleted - Switching actor to Follower - " +
- "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
- "journal-size={}",
- replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
- replicatedLog.snapshotTerm, replicatedLog.size());
- currentBehavior = switchBehavior(RaftState.Follower);
- onStateChanged();
+ initializeBehavior();
+ }
+ }
+ }
+
+ private void onRecoveredSnapshot(SnapshotOffer offer) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: SnapshotOffer called..", persistenceId());
+ }
+
+ initRecoveryTimer();
+
+ Snapshot snapshot = (Snapshot) offer.snapshot();
+
+ // Create a replicated log with the snapshot information
+ // The replicated log can be used later on to retrieve this snapshot
+ // when we need to install it on a peer
+ replicatedLog = new ReplicatedLogImpl(snapshot);
+
+ context.setReplicatedLog(replicatedLog);
+ context.setLastApplied(snapshot.getLastAppliedIndex());
+ context.setCommitIndex(snapshot.getLastAppliedIndex());
+
+ Stopwatch timer = Stopwatch.createStarted();
+
+ // Apply the snapshot to the actors state
+ applyRecoverySnapshot(snapshot.getState());
+
+ timer.stop();
+ LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
+ replicatedLog.size(), persistenceId(), timer.toString(),
+ replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
+ }
+
+ private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
+ }
+
+ replicatedLog.append(logEntry);
+ }
+
+ private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
+ persistenceId(), context.getLastApplied() + 1, ale.getToIndex());
+ }
+
+ for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+ batchRecoveredLogEntry(replicatedLog.get(i));
+ }
+
+ context.setLastApplied(ale.getToIndex());
+ context.setCommitIndex(ale.getToIndex());
+ }
+
+ private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
+ initRecoveryTimer();
+
+ int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
+ if(currentRecoveryBatchCount == 0) {
+ startLogRecoveryBatch(batchSize);
+ }
+
+ appendRecoveredLogEntry(logEntry.getData());
+
+ if(++currentRecoveryBatchCount >= batchSize) {
+ endCurrentLogRecoveryBatch();
+ }
+ }
+
+ private void endCurrentLogRecoveryBatch() {
+ applyCurrentLogRecoveryBatch();
+ currentRecoveryBatchCount = 0;
+ }
+
+ private void onRecoveryCompletedMessage() {
+ if(currentRecoveryBatchCount > 0) {
+ endCurrentLogRecoveryBatch();