RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
this.context = context;
this.cohort = cohort;
- this.log = context.getLogger();
+ log = context.getLogger();
}
boolean handleRecoveryMessage(final Object message, final PersistentDataProvider persistentProvider) {
}
boolean recoveryComplete = false;
- if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
- ((UpdateElectionTerm) message).getVotedFor());
- } else if (message instanceof SnapshotOffer) {
- onRecoveredSnapshot((SnapshotOffer) message);
- } else if (message instanceof ReplicatedLogEntry) {
- onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
- } else if (message instanceof ApplyJournalEntries) {
- onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
- } else if (message instanceof DeleteEntries) {
- onDeleteEntries((DeleteEntries) message);
- } else if (message instanceof ServerConfigurationPayload) {
- context.updatePeerIds((ServerConfigurationPayload)message);
+ if (message instanceof UpdateElectionTerm updateElectionTerm) {
+ context.getTermInformation().update(updateElectionTerm.getCurrentTerm(), updateElectionTerm.getVotedFor());
+ } else if (message instanceof SnapshotOffer snapshotOffer) {
+ onRecoveredSnapshot(snapshotOffer);
+ } else if (message instanceof ReplicatedLogEntry replicatedLogEntry) {
+ onRecoveredJournalLogEntry(replicatedLogEntry);
+ } else if (message instanceof ApplyJournalEntries applyJournalEntries) {
+ onRecoveredApplyLogEntries(applyJournalEntries.getToIndex());
+ } else if (message instanceof DeleteEntries deleteEntries) {
+ onDeleteEntries(deleteEntries);
+ } else if (message instanceof ServerConfigurationPayload serverConfigurationPayload) {
+ context.updatePeerIds(serverConfigurationPayload);
} else if (message instanceof RecoveryCompleted) {
recoveryComplete = true;
onRecoveryCompletedMessage(persistentProvider);
final SnapshotManager snapshotManager = context.getSnapshotManager();
if (snapshotManager.capture(logEntry, -1)) {
log.info("Capturing snapshot, resetting timer for the next recovery snapshot interval.");
- this.recoverySnapshotTimer.reset().start();
+ recoverySnapshotTimer.reset().start();
} else {
log.info("SnapshotManager is not able to capture snapshot at this time. It will be retried "
+ "again with the next recovered entry.");
}
private boolean shouldTakeRecoverySnapshot() {
- return this.recoverySnapshotTimer != null && this.recoverySnapshotTimer.elapsed(TimeUnit.SECONDS)
+ return recoverySnapshotTimer != null && recoverySnapshotTimer.elapsed(TimeUnit.SECONDS)
>= context.getConfigParams().getRecoverySnapshotIntervalSeconds();
}
}
private static boolean isMigratedSerializable(final Object message) {
- return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated();
+ return message instanceof MigratedSerializable migrated && migrated.isMigrated();
}
}