2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.io.ByteSource;
14 import java.io.IOException;
15 import java.io.OutputStream;
16 import java.util.Optional;
17 import java.util.function.Consumer;
18 import org.apache.pekko.persistence.SnapshotSelectionCriteria;
19 import org.eclipse.jdt.annotation.NonNull;
20 import org.eclipse.jdt.annotation.NonNullByDefault;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
23 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
24 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
25 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
26 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
27 import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
28 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
29 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
30 import org.opendaylight.controller.cluster.raft.spi.ImmutableRaftEntryMeta;
31 import org.opendaylight.controller.cluster.raft.spi.RaftEntryMeta;
32 import org.slf4j.Logger;
35 * Manages the capturing of snapshots for a RaftActor.
38 * @author Thomas Pantelis
40 public class SnapshotManager implements SnapshotState {
42 * Internal message, issued by follower behavior to its actor, eventually routed to {@link SnapshotManager}.
43 * Metadata matches information conveyed in {@link InstallSnapshot}.
46 public record ApplyLeaderSnapshot(
49 ImmutableRaftEntryMeta lastEntry,
51 @Nullable ServerConfigurationPayload serverConfig,
52 ApplyLeaderSnapshot.Callback callback) {
53 public ApplyLeaderSnapshot {
54 requireNonNull(leaderId);
55 requireNonNull(lastEntry);
56 requireNonNull(snapshot);
57 requireNonNull(callback);
58 // TODO: sanity check term vs. lastEntry ?
61 public interface Callback {
70 * The task being executed by this instance.
72 private sealed interface Task {
77 * This instance is capturing current user state, for example to save a state snapshot prior to purging journal
81 private record Capture(long lastSequenceNumber, CaptureSnapshot request) implements Task {
83 requireNonNull(request);
88 * This instance is just sitting here, doing nothing in particular.
91 private static final class Idle implements Task {
92 private static final Idle INSTANCE = new Idle();
96 * This instance is talking to persistence for some reason. We have started talking to persistence when it was
97 * at {@linkplain #lastSequenceNumber}.
99 private sealed interface Persist extends Task {
101 * Returns the last sequence number reported by persistence when this task started.
103 * @return persistence last sequence number
105 long lastSequenceNumber();
109 * This instance is persisting an {@link ApplyLeaderSnapshot}.
112 private record PersistApply(
113 long lastSequenceNumber,
115 ApplyLeaderSnapshot.@Nullable Callback callback) implements Persist {
117 requireNonNull(snapshot);
122 * This instance is persisting a previously {@link Capture}d snapshot.
124 private record PersistCapture(long lastSequenceNumber) implements Persist {
128 private final RaftActorContext context;
129 private final Logger log;
131 private RaftActorSnapshotCohort snapshotCohort = NoopRaftActorSnapshotCohort.INSTANCE;
132 private Consumer<Optional<OutputStream>> createSnapshotProcedure = null;
133 private @NonNull Task task = Idle.INSTANCE;
136 * Constructs an instance.
138 * @param context the RaftActorContext
139 * @param logger the Logger
141 public SnapshotManager(final RaftActorContext context, final Logger logger) {
142 this.context = context;
146 public boolean isApplying() {
147 return task instanceof PersistApply;
151 public boolean isCapturing() {
152 return !(task instanceof Idle);
156 public boolean capture(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex) {
157 if (task instanceof Idle) {
158 return capture(lastLogEntry, replicatedToAllIndex, null, false);
160 log.debug("capture should not be called in state {}", task);
164 private boolean capture(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex,
165 final String targetFollower, final boolean mandatoryTrim) {
166 final var request = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, mandatoryTrim);
168 final OutputStream installSnapshotStream;
169 if (targetFollower != null) {
170 installSnapshotStream = context.getFileBackedOutputStreamFactory().newInstance();
171 log.info("{}: Initiating snapshot capture {} to install on {}",
172 persistenceId(), request, targetFollower);
174 installSnapshotStream = null;
175 log.info("{}: Initiating snapshot capture {}", persistenceId(), request);
178 final var lastSeq = context.getPersistenceProvider().getLastSequenceNumber();
180 log.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSeq);
182 task = new Capture(lastSeq, request);
183 return capture(installSnapshotStream);
186 @SuppressWarnings("checkstyle:IllegalCatch")
187 private boolean capture(final @Nullable OutputStream installSnapshotStream) {
189 createSnapshotProcedure.accept(Optional.ofNullable(installSnapshotStream));
190 } catch (Exception e) {
191 task = Idle.INSTANCE;
192 log.error("Error creating snapshot", e);
199 public boolean captureToInstall(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex,
200 final String targetFollower) {
201 if (task instanceof Idle) {
202 return capture(lastLogEntry, replicatedToAllIndex, targetFollower, false);
204 log.debug("captureToInstall should not be called in state {}", task);
209 public boolean captureWithForcedTrim(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex) {
210 if (task instanceof Idle) {
211 return capture(lastLogEntry, replicatedToAllIndex, null, true);
213 log.debug("captureWithForcedTrim should not be called in state {}", task);
218 * Applies a snapshot on a follower that was installed by the leader.
220 * @param snapshot the {@link ApplyLeaderSnapshot} to apply.
223 public void applyFromLeader(final ApplyLeaderSnapshot snapshot) {
224 if (!(task instanceof Idle)) {
225 log.debug("applySnapshot should not be called in state {}", task);
229 final var snapshotBytes = snapshot.snapshot();
230 log.info("{}: Applying snapshot on follower: {}", context.getId(), snapshotBytes);
232 final Snapshot.State snapshotState;
234 snapshotState = convertSnapshot(snapshotBytes);
235 } catch (IOException e) {
236 log.debug("{}: failed to convert InstallSnapshot to state", context.getId(), e);
237 snapshot.callback().onFailure();
241 log.debug("{}: Converted InstallSnapshot from leader: {} to state{}", context.getId(), snapshot.leaderId(),
242 snapshotState.needsMigration() ? " (needs migration)" : "");
244 Snapshot.ofTermLeader(snapshotState, snapshot.lastEntry(), context.termInfo(), snapshot.serverConfig()),
245 snapshot.callback());
249 * Applies a snapshot from recovery.
251 * @param snapshot the {@link Snapshot} to apply.
254 public void applyFromRecovery(final Snapshot snapshot) {
255 if (task instanceof Idle) {
256 persistSnapshot(requireNonNull(snapshot), null);
258 log.debug("apply should not be called in state {}", task);
263 private void persistSnapshot(final Snapshot snapshot, final ApplyLeaderSnapshot.@Nullable Callback callback) {
264 final var persistence = context.getPersistenceProvider();
265 final var lastSeq = persistence.getLastSequenceNumber();
266 final var persisting = new PersistApply(lastSeq, snapshot, callback);
269 log.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSeq);
270 context.getPersistenceProvider().saveSnapshot(persisting.snapshot);
274 public void persist(final Snapshot.State snapshotState, final Optional<OutputStream> installSnapshotStream,
275 final long totalMemory) {
276 if (!(task instanceof Capture(final var lastSeq, final var request))) {
277 log.debug("persist should not be called in state {}", this);
281 // create a snapshot object from the state provided and save it when snapshot is saved async,
282 // SaveSnapshotSuccess is raised.
283 final var snapshot = Snapshot.create(snapshotState, request.getUnAppliedEntries(),
284 request.getLastIndex(), request.getLastTerm(),
285 request.getLastAppliedIndex(), request.getLastAppliedTerm(),
286 context.termInfo(), context.getPeerServerInfo(true));
288 context.getPersistenceProvider().saveSnapshot(snapshot);
290 log.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot);
292 final var config = context.getConfigParams();
293 final long absoluteThreshold = config.getSnapshotDataThreshold();
294 final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * ConfigParams.MEGABYTE
295 : totalMemory * config.getSnapshotDataThresholdPercentage() / 100;
297 final var replLog = context.getReplicatedLog();
298 final boolean dataSizeThresholdExceeded = replLog.dataSize() > dataThreshold;
299 final boolean logSizeExceededSnapshotBatchCount = replLog.size() >= config.getSnapshotBatchCount();
301 final var currentBehavior = context.getCurrentBehavior();
302 if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount || request.isMandatoryTrim()) {
303 if (log.isDebugEnabled()) {
304 if (dataSizeThresholdExceeded) {
305 log.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit "
306 + "with index {}", context.getId(), replLog.dataSize(), dataThreshold,
307 request.getLastAppliedIndex());
308 } else if (logSizeExceededSnapshotBatchCount) {
309 log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with "
310 + "index {}", context.getId(), replLog.size(), config.getSnapshotBatchCount(),
311 request.getLastAppliedIndex());
313 log.debug("{}: user triggered or root overwrite snapshot encountered, trimming log up to "
314 + "last applied index {}", context.getId(), request.getLastAppliedIndex());
318 // We either exceeded the memory threshold or the log size exceeded the snapshot batch
319 // count so, to keep the log memory footprint in check, clear the log based on lastApplied.
320 // This could/should only happen if one of the followers is down as normally we keep
321 // removing from the log as entries are replicated to all.
322 replLog.snapshotPreCommit(request.getLastAppliedIndex(), request.getLastAppliedTerm());
324 // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
325 // install snapshot to a follower.
326 if (request.getReplicatedToAllIndex() >= 0) {
327 currentBehavior.setReplicatedToAllIndex(request.getReplicatedToAllIndex());
330 } else if (request.getReplicatedToAllIndex() != -1) {
331 // clear the log based on replicatedToAllIndex
332 replLog.snapshotPreCommit(request.getReplicatedToAllIndex(),
333 request.getReplicatedToAllTerm());
335 currentBehavior.setReplicatedToAllIndex(request.getReplicatedToAllIndex());
337 // The replicatedToAllIndex was not found in the log
338 // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
339 // In this scenario we may need to save the snapshot to the akka persistence
340 // snapshot for recovery but we do not need to do the replicated log trimming.
341 replLog.snapshotPreCommit(replLog.getSnapshotIndex(), replLog.getSnapshotTerm());
344 log.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} and term: {}",
345 context.getId(), replLog.getSnapshotIndex(), replLog.getSnapshotTerm());
347 if (installSnapshotStream.isPresent()) {
349 final var snapshotStream = (FileBackedOutputStream) installSnapshotStream.orElseThrow();
351 if (context.getId().equals(currentBehavior.getLeaderId())) {
353 ByteSource snapshotBytes = snapshotStream.asByteSource();
354 currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot, snapshotBytes));
355 } catch (IOException e) {
356 log.error("{}: Snapshot install failed due to an unrecoverable streaming error",
360 snapshotStream.cleanup();
364 task = new PersistCapture(lastSeq);
368 public void commit(final long sequenceNumber, final long timeStamp) {
369 if (!(task instanceof Persist persist)) {
370 log.debug("commit should not be called in state {}", task);
374 log.debug("{}: Snapshot success - sequence number: {}", persistenceId(), sequenceNumber);
375 final var lastSequenceNumber = commit(persist);
377 final var persistence = context.getPersistenceProvider();
378 persistence.deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), timeStamp - 1, 0L, 0L));
379 persistence.deleteMessages(lastSequenceNumber);
384 @SuppressWarnings("checkstyle:IllegalCatch")
385 private long commit(final Persist persist) {
386 return switch (persist) {
387 case PersistApply(var lastSeq, var snapshot, var callback) -> {
389 // clears the followers log, sets the snapshot index to ensure adjusted-index works
390 context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context));
391 context.setLastApplied(snapshot.getLastAppliedIndex());
392 context.setCommitIndex(snapshot.getLastAppliedIndex());
393 // FIXME: This may be coming from the leader: we do not want to pollute our TermInfo if it is for
394 // this term: we may need to know who we voted for in the next elections.
395 // This behavior means we report as if we voted for the leader.
396 context.setTermInfo(snapshot.termInfo());
398 final var serverConfig = snapshot.getServerConfiguration();
399 if (serverConfig != null) {
400 context.updatePeerIds(serverConfig);
403 final var state = snapshot.getState();
404 if (state != null && !(state instanceof EmptyState)) {
405 snapshotCohort.applySnapshot(state);
408 if (callback != null) {
409 callback.onSuccess();
411 } catch (Exception e) {
412 log.error("{}: Error applying snapshot", context.getId(), e);
416 case PersistCapture(final var lastSeq) -> {
417 context.getReplicatedLog().snapshotCommit();
424 public void rollback() {
426 case PersistApply persist -> {
427 // Nothing to rollback if we're applying a snapshot from the leader.
428 final var callback = persist.callback;
429 if (callback != null) {
430 callback.onFailure();
434 case PersistCapture persist -> {
435 final var replLog = context.getReplicatedLog();
436 replLog.snapshotRollback();
437 log.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle. "
438 + "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(), replLog.getSnapshotIndex(),
439 replLog.getSnapshotTerm(), replLog.size());
442 default -> log.debug("rollback should not be called in state {}", task);
446 private void snapshotComplete() {
447 task = Idle.INSTANCE;
448 context.getActor().tell(SnapshotComplete.INSTANCE, context.getActor());
452 public long trimLog(final long desiredTrimIndex) {
453 if (!(task instanceof Idle)) {
454 log.debug("trimLog should not be called in state {}", task);
458 // we would want to keep the lastApplied as its used while capturing snapshots
459 long lastApplied = context.getLastApplied();
460 long tempMin = Math.min(desiredTrimIndex, lastApplied > -1 ? lastApplied - 1 : -1);
462 if (log.isTraceEnabled()) {
463 log.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}",
464 persistenceId(), desiredTrimIndex, lastApplied, tempMin);
468 final var replLog = context.getReplicatedLog();
469 if (replLog.isPresent(tempMin)) {
470 log.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
471 context.currentTerm());
473 //use the term of the temp-min, since we check for isPresent, entry will not be null
474 final var entry = replLog.get(tempMin);
475 replLog.snapshotPreCommit(tempMin, entry.term());
476 replLog.snapshotCommit(false);
481 final var currentBehavior = context.getCurrentBehavior();
482 if (tempMin > currentBehavior.getReplicatedToAllIndex()) {
483 // It is possible a follower was lagging and an install snapshot advanced its match index past the current
484 // replicatedToAllIndex. Since the follower is now caught up we should advance the replicatedToAllIndex
485 // (to tempMin). The fact that tempMin was not found in the log is likely due to a previous snapshot
486 // triggered by the memory threshold exceeded, in that case we trim the log to the last applied index even
487 // if previous entries weren't replicated to all followers.
488 currentBehavior.setReplicatedToAllIndex(tempMin);
493 @SuppressWarnings("checkstyle:hiddenField")
494 void setCreateSnapshotConsumer(final Consumer<Optional<OutputStream>> createSnapshotProcedure) {
495 this.createSnapshotProcedure = createSnapshotProcedure;
498 void setSnapshotCohort(final RaftActorSnapshotCohort snapshotCohort) {
499 this.snapshotCohort = snapshotCohort;
502 public Snapshot.@NonNull State convertSnapshot(final ByteSource snapshotBytes) throws IOException {
503 return snapshotCohort.deserializeSnapshot(snapshotBytes);
506 public long getLastSequenceNumber() {
507 return task instanceof Persist persist ? persist.lastSequenceNumber() : -1;
511 public @Nullable CaptureSnapshot getCaptureSnapshot() {
512 return task instanceof Capture capture ? capture.request : null;
515 private String persistenceId() {
516 return context.getId();
520 * Constructs a CaptureSnapshot instance.
522 * @param lastLogEntry the last log entry for the snapshot.
523 * @param replicatedToAllIndex the index of the last entry replicated to all followers.
524 * @return a new CaptureSnapshot instance.
526 public @NonNull CaptureSnapshot newCaptureSnapshot(final RaftEntryMeta lastLogEntry,
527 final long replicatedToAllIndex, final boolean mandatoryTrim) {
528 final var replLog = context.getReplicatedLog();
529 final var lastAppliedEntry = computeLastAppliedEntry(lastLogEntry);
531 final var entry = replLog.get(replicatedToAllIndex);
532 final var replicatedToAllEntry = entry != null ? entry : ImmutableRaftEntryMeta.of(-1, -1);
534 long lastAppliedIndex = lastAppliedEntry.index();
535 long lastAppliedTerm = lastAppliedEntry.term();
537 final var unAppliedEntries = replLog.getFrom(lastAppliedIndex + 1);
539 final long lastLogEntryIndex;
540 final long lastLogEntryTerm;
541 if (lastLogEntry == null) {
542 // When we don't have journal present, for example two captureSnapshots executed right after another with no
543 // new journal we still want to preserve the index and term in the snapshot.
544 lastAppliedIndex = lastLogEntryIndex = replLog.getSnapshotIndex();
545 lastAppliedTerm = lastLogEntryTerm = replLog.getSnapshotTerm();
547 log.debug("{}: Capturing Snapshot : lastLogEntry is null. Using snapshot values lastAppliedIndex {} and "
548 + "lastAppliedTerm {} instead.", persistenceId(), lastAppliedIndex, lastAppliedTerm);
550 lastLogEntryIndex = lastLogEntry.index();
551 lastLogEntryTerm = lastLogEntry.term();
554 return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
555 replicatedToAllEntry.index(), replicatedToAllEntry.term(), unAppliedEntries, mandatoryTrim);
559 private RaftEntryMeta computeLastAppliedEntry(final @Nullable RaftEntryMeta lastLogEntry) {
560 return computeLastAppliedEntry(context.getReplicatedLog(), context.getLastApplied(), lastLogEntry,
561 context.hasFollowers());
566 static RaftEntryMeta computeLastAppliedEntry(final ReplicatedLog log, final long originalIndex,
567 final @Nullable RaftEntryMeta lastLogEntry, final boolean hasFollowers) {
569 final var entry = log.lookupMeta(originalIndex);
574 final var snapshotIndex = log.getSnapshotIndex();
575 if (snapshotIndex > -1) {
576 return ImmutableRaftEntryMeta.of(snapshotIndex, log.getSnapshotTerm());
578 } else if (lastLogEntry != null) {
579 // since we have persisted the last-log-entry to persistent journal before the capture, we would want
580 // to snapshot from this entry.
584 return ImmutableRaftEntryMeta.of(-1, -1);