- } else if (message instanceof ReplicatedLogEntry) {
- replicatedLog.append((ReplicatedLogEntry) message);
- } else if (message instanceof DeleteEntries) {
- replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
- } else if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), ((UpdateElectionTerm) message).getVotedFor());
- } else if (message instanceof RecoveryCompleted) {
- LOG.debug(
- "RecoveryCompleted - Switching actor to Follower - " +
- "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
- "journal-size={}",
- replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
- replicatedLog.snapshotTerm, replicatedLog.size());
- currentBehavior = switchBehavior(RaftState.Follower);
- onStateChanged();
- }
- }
-
- @Override public void onReceiveCommand(Object message) {
+ timer.stop();
+ LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
+ replicatedLog.size(), persistenceId(), timer.toString(),
+ replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+ }
+
+ private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Received ReplicatedLogEntry for recovery: {}", logEntry.getIndex());
+ }
+
+ replicatedLog.append(logEntry);
+ }
+
+ private void onRecoveredApplyLogEntries(ApplyLogEntries ale) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Received ApplyLogEntries for recovery, applying to state: {} to {}",
+ context.getLastApplied() + 1, ale.getToIndex());
+ }
+
+ for (long i = context.getLastApplied() + 1; i <= ale.getToIndex(); i++) {
+ batchRecoveredLogEntry(replicatedLog.get(i));
+ }
+
+ context.setLastApplied(ale.getToIndex());
+ context.setCommitIndex(ale.getToIndex());
+ }
+
+ private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
+ initRecoveryTimer();
+
+ int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
+ if(currentRecoveryBatchCount == 0) {
+ startLogRecoveryBatch(batchSize);
+ }
+
+ appendRecoveredLogEntry(logEntry.getData());
+
+ if(++currentRecoveryBatchCount >= batchSize) {
+ endCurrentLogRecoveryBatch();
+ }
+ }
+
+ private void endCurrentLogRecoveryBatch() {
+ applyCurrentLogRecoveryBatch();
+ currentRecoveryBatchCount = 0;
+ }
+
+ private void onRecoveryCompletedMessage() {
+ if(currentRecoveryBatchCount > 0) {
+ endCurrentLogRecoveryBatch();
+ }
+
+ onRecoveryComplete();
+
+ String recoveryTime = "";
+ if(recoveryTimer != null) {
+ recoveryTimer.stop();
+ recoveryTime = " in " + recoveryTimer.toString();
+ recoveryTimer = null;
+ }
+
+ LOG.info(
+ "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
+ "Persistence Id = " + persistenceId() +
+ " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
+ "journal-size={}",
+ replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
+ replicatedLog.snapshotTerm, replicatedLog.size());
+
+ currentBehavior = new Follower(context);
+ onStateChanged();
+ }
+
+ @Override public void handleCommand(Object message) {