import akka.persistence.SnapshotOffer;
import com.google.common.base.Stopwatch;
import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.PersistentDataProvider;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.messages.PersistentPayload;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
import org.slf4j.Logger;
/**
private boolean hasMigratedDataRecovered;
private Stopwatch recoveryTimer;
+ private Stopwatch recoverySnapshotTimer;
private final Logger log;
RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
this.context = context;
this.cohort = cohort;
- this.log = context.getLogger();
+ log = context.getLogger();
}
boolean handleRecoveryMessage(final Object message, final PersistentDataProvider persistentProvider) {
return context.getReplicatedLog();
}
- private void initRecoveryTimer() {
+ private void initRecoveryTimers() {
if (recoveryTimer == null) {
recoveryTimer = Stopwatch.createStarted();
}
+ if (recoverySnapshotTimer == null && context.getConfigParams().getRecoverySnapshotIntervalSeconds() > 0) {
+ recoverySnapshotTimer = Stopwatch.createStarted();
+ }
}
private void onRecoveredSnapshot(final SnapshotOffer offer) {
log.debug("{}: SnapshotOffer called.", context.getId());
- initRecoveryTimer();
+ initRecoveryTimers();
Snapshot snapshot = (Snapshot) offer.snapshot();
if (logEntry != null) {
lastApplied++;
batchRecoveredLogEntry(logEntry);
+ if (shouldTakeRecoverySnapshot() && !context.getSnapshotManager().isCapturing()) {
+ if (currentRecoveryBatchCount > 0) {
+ endCurrentLogRecoveryBatch();
+ }
+ context.setLastApplied(lastApplied);
+ context.setCommitIndex(lastApplied);
+ takeRecoverySnapshot(logEntry);
+ }
} else {
// Shouldn't happen but cover it anyway.
log.error("{}: Log entry not found for index {}", context.getId(), i);
}
private void batchRecoveredLogEntry(final ReplicatedLogEntry logEntry) {
- initRecoveryTimer();
+ initRecoveryTimers();
int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
if (!isServerConfigurationPayload(logEntry)) {
}
}
+ private void takeRecoverySnapshot(final ReplicatedLogEntry logEntry) {
+ log.info("Time for recovery snapshot on entry with index {}", logEntry.getIndex());
+ final SnapshotManager snapshotManager = context.getSnapshotManager();
+ if (snapshotManager.capture(logEntry, -1)) {
+ log.info("Capturing snapshot, resetting timer for the next recovery snapshot interval.");
+ recoverySnapshotTimer.reset().start();
+ } else {
+ log.info("SnapshotManager is not able to capture snapshot at this time. It will be retried "
+ + "again with the next recovered entry.");
+ }
+ }
+
+ private boolean shouldTakeRecoverySnapshot() {
+ return recoverySnapshotTimer != null && recoverySnapshotTimer.elapsed(TimeUnit.SECONDS)
+ >= context.getConfigParams().getRecoverySnapshotIntervalSeconds();
+ }
+
private void endCurrentLogRecoveryBatch() {
cohort.applyCurrentLogRecoveryBatch();
currentRecoveryBatchCount = 0;
recoveryTime = "";
}
+ if (recoverySnapshotTimer != null) {
+ recoverySnapshotTimer.stop();
+ recoverySnapshotTimer = null;
+ }
+
log.info("{}: Recovery completed {} - Switching actor to Follower - last log index = {}, last log term = {}, "
+ "snapshot index = {}, snapshot term = {}, journal size = {}", context.getId(), recoveryTime,
replicatedLog().lastIndex(), replicatedLog().lastTerm(), replicatedLog().getSnapshotIndex(),
}
private static boolean isMigratedSerializable(final Object message) {
- return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated();
+ return message instanceof MigratedSerializable migrated && migrated.isMigrated();
}
}