import akka.persistence.SnapshotOffer;
import com.google.common.base.Stopwatch;
import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.messages.PersistentPayload;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
import org.slf4j.Logger;
/**
private boolean hasMigratedDataRecovered;
private Stopwatch recoveryTimer;
+ private Stopwatch recoverySnapshotTimer;
private final Logger log;
RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
this.context = context;
this.cohort = cohort;
- this.log = context.getLogger();
+ log = context.getLogger();
}
boolean handleRecoveryMessage(final Object message, final PersistentDataProvider persistentProvider) {
}
boolean recoveryComplete = false;
- if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
- ((UpdateElectionTerm) message).getVotedFor());
- } else if (message instanceof SnapshotOffer) {
- onRecoveredSnapshot((SnapshotOffer) message);
- } else if (message instanceof ReplicatedLogEntry) {
- onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
- } else if (message instanceof ApplyJournalEntries) {
- onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
- } else if (message instanceof DeleteEntries) {
- onDeleteEntries((DeleteEntries) message);
- } else if (message instanceof ServerConfigurationPayload) {
- context.updatePeerIds((ServerConfigurationPayload)message);
+ if (message instanceof UpdateElectionTerm updateElectionTerm) {
+ context.getTermInformation().update(updateElectionTerm.getCurrentTerm(), updateElectionTerm.getVotedFor());
+ } else if (message instanceof SnapshotOffer snapshotOffer) {
+ onRecoveredSnapshot(snapshotOffer);
+ } else if (message instanceof ReplicatedLogEntry replicatedLogEntry) {
+ onRecoveredJournalLogEntry(replicatedLogEntry);
+ } else if (message instanceof ApplyJournalEntries applyJournalEntries) {
+ onRecoveredApplyLogEntries(applyJournalEntries.getToIndex());
+ } else if (message instanceof DeleteEntries deleteEntries) {
+ onDeleteEntries(deleteEntries);
+ } else if (message instanceof ServerConfigurationPayload serverConfigurationPayload) {
+ context.updatePeerIds(serverConfigurationPayload);
} else if (message instanceof RecoveryCompleted) {
recoveryComplete = true;
onRecoveryCompletedMessage(persistentProvider);
return context.getReplicatedLog();
}
- private void initRecoveryTimer() {
+ private void initRecoveryTimers() {
if (recoveryTimer == null) {
recoveryTimer = Stopwatch.createStarted();
}
+ if (recoverySnapshotTimer == null && context.getConfigParams().getRecoverySnapshotIntervalSeconds() > 0) {
+ recoverySnapshotTimer = Stopwatch.createStarted();
+ }
}
private void onRecoveredSnapshot(final SnapshotOffer offer) {
log.debug("{}: SnapshotOffer called.", context.getId());
- initRecoveryTimer();
+ initRecoveryTimers();
Snapshot snapshot = (Snapshot) offer.snapshot();
context.setCommitIndex(snapshot.getLastAppliedIndex());
context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
- Stopwatch timer = Stopwatch.createStarted();
+ final Stopwatch timer = Stopwatch.createStarted();
// Apply the snapshot to the actors state
- if (!(snapshot.getState() instanceof EmptyState)) {
- cohort.applyRecoverySnapshot(snapshot.getState());
+ final State snapshotState = snapshot.getState();
+ if (snapshotState.needsMigration()) {
+ hasMigratedDataRecovered = true;
+ }
+ if (!(snapshotState instanceof EmptyState)) {
+ cohort.applyRecoverySnapshot(snapshotState);
}
if (snapshot.getServerConfiguration() != null) {
timer.stop();
log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
- context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
- replicatedLog().getSnapshotTerm(), replicatedLog().size());
+ context.getId(), timer, replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm(),
+ replicatedLog().size());
}
private void onRecoveredJournalLogEntry(final ReplicatedLogEntry logEntry) {
if (logEntry != null) {
lastApplied++;
batchRecoveredLogEntry(logEntry);
+ if (shouldTakeRecoverySnapshot() && !context.getSnapshotManager().isCapturing()) {
+ if (currentRecoveryBatchCount > 0) {
+ endCurrentLogRecoveryBatch();
+ }
+ context.setLastApplied(lastApplied);
+ context.setCommitIndex(lastApplied);
+ takeRecoverySnapshot(logEntry);
+ }
} else {
// Shouldn't happen but cover it anyway.
log.error("{}: Log entry not found for index {}", context.getId(), i);
}
private void batchRecoveredLogEntry(final ReplicatedLogEntry logEntry) {
- initRecoveryTimer();
+ initRecoveryTimers();
int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
if (!isServerConfigurationPayload(logEntry)) {
}
}
+ private void takeRecoverySnapshot(final ReplicatedLogEntry logEntry) {
+ log.info("Time for recovery snapshot on entry with index {}", logEntry.getIndex());
+ final SnapshotManager snapshotManager = context.getSnapshotManager();
+ if (snapshotManager.capture(logEntry, -1)) {
+ log.info("Capturing snapshot, resetting timer for the next recovery snapshot interval.");
+ recoverySnapshotTimer.reset().start();
+ } else {
+ log.info("SnapshotManager is not able to capture snapshot at this time. It will be retried "
+ + "again with the next recovered entry.");
+ }
+ }
+
+ private boolean shouldTakeRecoverySnapshot() {
+ return recoverySnapshotTimer != null && recoverySnapshotTimer.elapsed(TimeUnit.SECONDS)
+ >= context.getConfigParams().getRecoverySnapshotIntervalSeconds();
+ }
+
private void endCurrentLogRecoveryBatch() {
cohort.applyCurrentLogRecoveryBatch();
currentRecoveryBatchCount = 0;
endCurrentLogRecoveryBatch();
}
- String recoveryTime = "";
+ final String recoveryTime;
if (recoveryTimer != null) {
- recoveryTimer.stop();
- recoveryTime = " in " + recoveryTimer.toString();
+ recoveryTime = " in " + recoveryTimer.stop();
recoveryTimer = null;
+ } else {
+ recoveryTime = "";
}
- log.info("Recovery completed {} - Switching actor to Follower - Persistence Id = {}"
- + " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, journal-size = {}",
- recoveryTime, context.getId(), replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
- replicatedLog().getSnapshotTerm(), replicatedLog().size());
+ if (recoverySnapshotTimer != null) {
+ recoverySnapshotTimer.stop();
+ recoverySnapshotTimer = null;
+ }
+
+ log.info("{}: Recovery completed {} - Switching actor to Follower - last log index = {}, last log term = {}, "
+ + "snapshot index = {}, snapshot term = {}, journal size = {}", context.getId(), recoveryTime,
+ replicatedLog().lastIndex(), replicatedLog().lastTerm(), replicatedLog().getSnapshotIndex(),
+ replicatedLog().getSnapshotTerm(), replicatedLog().size());
if (dataRecoveredWithPersistenceDisabled
|| hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
}
private static boolean isMigratedSerializable(final Object message) {
- return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated();
+ return message instanceof MigratedSerializable migrated && migrated.isMigrated();
}
}