50e5d2ec1b3ed327d77fad8434883a84c166403f
[controller.git] /
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.io.ByteSource;
14 import java.io.IOException;
15 import java.io.OutputStream;
16 import java.util.Optional;
17 import java.util.function.Consumer;
18 import org.apache.pekko.persistence.SnapshotSelectionCriteria;
19 import org.eclipse.jdt.annotation.NonNull;
20 import org.eclipse.jdt.annotation.NonNullByDefault;
21 import org.eclipse.jdt.annotation.Nullable;
22 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
23 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
24 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
25 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
26 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
27 import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
28 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
29 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
30 import org.opendaylight.controller.cluster.raft.spi.ImmutableRaftEntryMeta;
31 import org.opendaylight.controller.cluster.raft.spi.RaftEntryMeta;
32 import org.slf4j.Logger;
33
34 /**
35  * Manages the capturing of snapshots for a RaftActor.
36  *
37  * @author Moiz Raja
38  * @author Thomas Pantelis
39  */
40 public class SnapshotManager implements SnapshotState {
41     /**
42      * Internal message, issued by follower behavior to its actor, eventually routed to {@link SnapshotManager}.
43      * Metadata matches information conveyed in {@link InstallSnapshot}.
44      */
45     @NonNullByDefault
46     public record ApplyLeaderSnapshot(
47             String leaderId,
48             long term,
49             ImmutableRaftEntryMeta lastEntry,
50             ByteSource snapshot,
51             @Nullable ServerConfigurationPayload serverConfig,
52             ApplyLeaderSnapshot.Callback callback) {
53         public ApplyLeaderSnapshot {
54             requireNonNull(leaderId);
55             requireNonNull(lastEntry);
56             requireNonNull(snapshot);
57             requireNonNull(callback);
58             // TODO: sanity check term vs. lastEntry ?
59         }
60
61         public interface Callback {
62
63             void onSuccess();
64
65             void onFailure();
66         }
67     }
68
69     /**
70      * The task being executed by this instance.
71      */
72     private sealed interface Task {
73         // Nothing else
74     }
75
76     /**
77      * This instance is capturing current user state, for example to save a state snapshot prior to purging journal
78      * entries.
79      */
80     @NonNullByDefault
81     private record Capture(long lastSequenceNumber, CaptureSnapshot request) implements Task {
82         Capture {
83             requireNonNull(request);
84         }
85     }
86
87     /**
88      * This instance is just sitting here, doing nothing in particular.
89      */
90     @NonNullByDefault
91     private static final class Idle implements Task {
92         private static final Idle INSTANCE = new Idle();
93     }
94
95     /**
96      * This instance is talking to persistence for some reason. We have started talking to persistence when it was
97      * at {@linkplain #lastSequenceNumber}.
98      */
99     private sealed interface Persist extends Task {
100         /**
101          * Returns the last sequence number reported by persistence when this task started.
102          *
103          * @return persistence last sequence number
104          */
105         long lastSequenceNumber();
106     }
107
108     /**
109      * This instance is persisting an {@link ApplyLeaderSnapshot}.
110      */
111     @NonNullByDefault
112     private record PersistApply(
113             long lastSequenceNumber,
114             Snapshot snapshot,
115             ApplyLeaderSnapshot.@Nullable Callback callback) implements Persist {
116         PersistApply {
117             requireNonNull(snapshot);
118         }
119     }
120
121     /**
122      * This instance is persisting a previously {@link Capture}d snapshot.
123      */
124     private record PersistCapture(long lastSequenceNumber) implements Persist {
125         // Nothing else
126     }
127
128     private final RaftActorContext context;
129     private final Logger log;
130
131     private RaftActorSnapshotCohort snapshotCohort = NoopRaftActorSnapshotCohort.INSTANCE;
132     private Consumer<Optional<OutputStream>> createSnapshotProcedure = null;
133     private @NonNull Task task = Idle.INSTANCE;
134
135     /**
136      * Constructs an instance.
137      *
138      * @param context the RaftActorContext
139      * @param logger the Logger
140      */
141     public SnapshotManager(final RaftActorContext context, final Logger logger) {
142         this.context = context;
143         log = logger;
144     }
145
146     public boolean isApplying() {
147         return task instanceof PersistApply;
148     }
149
150     @Override
151     public boolean isCapturing() {
152         return !(task instanceof Idle);
153     }
154
155     @Override
156     public boolean capture(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex) {
157         if (task instanceof Idle) {
158             return capture(lastLogEntry, replicatedToAllIndex, null, false);
159         }
160         log.debug("capture should not be called in state {}", task);
161         return false;
162     }
163
164     private boolean capture(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex,
165             final String targetFollower, final boolean mandatoryTrim) {
166         final var request = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, mandatoryTrim);
167
168         final OutputStream installSnapshotStream;
169         if (targetFollower != null) {
170             installSnapshotStream = context.getFileBackedOutputStreamFactory().newInstance();
171             log.info("{}: Initiating snapshot capture {} to install on {}",
172                     persistenceId(), request, targetFollower);
173         } else {
174             installSnapshotStream = null;
175             log.info("{}: Initiating snapshot capture {}", persistenceId(), request);
176         }
177
178         final var lastSeq = context.getPersistenceProvider().getLastSequenceNumber();
179
180         log.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSeq);
181
182         task = new Capture(lastSeq, request);
183         return capture(installSnapshotStream);
184     }
185
186     @SuppressWarnings("checkstyle:IllegalCatch")
187     private boolean capture(final @Nullable OutputStream installSnapshotStream) {
188         try {
189             createSnapshotProcedure.accept(Optional.ofNullable(installSnapshotStream));
190         } catch (Exception e) {
191             task = Idle.INSTANCE;
192             log.error("Error creating snapshot", e);
193             return false;
194         }
195         return true;
196     }
197
198     @Override
199     public boolean captureToInstall(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex,
200             final String targetFollower) {
201         if (task instanceof Idle) {
202             return capture(lastLogEntry, replicatedToAllIndex, targetFollower, false);
203         }
204         log.debug("captureToInstall should not be called in state {}", task);
205         return false;
206     }
207
208     @Override
209     public boolean captureWithForcedTrim(final RaftEntryMeta lastLogEntry, final long replicatedToAllIndex) {
210         if (task instanceof Idle) {
211             return capture(lastLogEntry, replicatedToAllIndex, null, true);
212         }
213         log.debug("captureWithForcedTrim should not be called in state {}", task);
214         return false;
215     }
216
217     /**
218      * Applies a snapshot on a follower that was installed by the leader.
219      *
220      * @param snapshot the {@link ApplyLeaderSnapshot} to apply.
221      */
222     @NonNullByDefault
223     public void applyFromLeader(final ApplyLeaderSnapshot snapshot) {
224         if (!(task instanceof Idle)) {
225             log.debug("applySnapshot should not be called in state {}", task);
226             return;
227         }
228
229         final var snapshotBytes = snapshot.snapshot();
230         log.info("{}: Applying snapshot on follower: {}", context.getId(), snapshotBytes);
231
232         final Snapshot.State snapshotState;
233         try {
234             snapshotState = convertSnapshot(snapshotBytes);
235         } catch (IOException e) {
236             log.debug("{}: failed to convert InstallSnapshot to state", context.getId(), e);
237             snapshot.callback().onFailure();
238             return;
239         }
240
241         log.debug("{}: Converted InstallSnapshot from leader: {} to state{}", context.getId(), snapshot.leaderId(),
242             snapshotState.needsMigration() ? " (needs migration)" : "");
243         persistSnapshot(
244             Snapshot.ofTermLeader(snapshotState, snapshot.lastEntry(), context.termInfo(), snapshot.serverConfig()),
245             snapshot.callback());
246     }
247
248     /**
249      * Applies a snapshot from recovery.
250      *
251      * @param snapshot the {@link Snapshot} to apply.
252      */
253     @NonNullByDefault
254     public void applyFromRecovery(final Snapshot snapshot) {
255         if (task instanceof Idle) {
256             persistSnapshot(requireNonNull(snapshot), null);
257         } else {
258             log.debug("apply should not be called in state {}", task);
259         }
260     }
261
262     @NonNullByDefault
263     private void persistSnapshot(final Snapshot snapshot, final ApplyLeaderSnapshot.@Nullable Callback callback) {
264         final var persistence = context.getPersistenceProvider();
265         final var lastSeq = persistence.getLastSequenceNumber();
266         final var persisting = new PersistApply(lastSeq, snapshot, callback);
267
268         task = persisting;
269         log.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSeq);
270         context.getPersistenceProvider().saveSnapshot(persisting.snapshot);
271     }
272
273     @Override
274     public void persist(final Snapshot.State snapshotState, final Optional<OutputStream> installSnapshotStream,
275             final long totalMemory) {
276         if (!(task instanceof Capture(final var lastSeq, final var request))) {
277             log.debug("persist should not be called in state {}", this);
278             return;
279         }
280
281         // create a snapshot object from the state provided and save it when snapshot is saved async,
282         // SaveSnapshotSuccess is raised.
283         final var snapshot = Snapshot.create(snapshotState, request.getUnAppliedEntries(),
284                 request.getLastIndex(), request.getLastTerm(),
285                 request.getLastAppliedIndex(), request.getLastAppliedTerm(),
286                 context.termInfo(), context.getPeerServerInfo(true));
287
288         context.getPersistenceProvider().saveSnapshot(snapshot);
289
290         log.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot);
291
292         final var config = context.getConfigParams();
293         final long absoluteThreshold = config.getSnapshotDataThreshold();
294         final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * ConfigParams.MEGABYTE
295                 : totalMemory * config.getSnapshotDataThresholdPercentage() / 100;
296
297         final var replLog = context.getReplicatedLog();
298         final boolean dataSizeThresholdExceeded = replLog.dataSize() > dataThreshold;
299         final boolean logSizeExceededSnapshotBatchCount = replLog.size() >= config.getSnapshotBatchCount();
300
301         final var currentBehavior = context.getCurrentBehavior();
302         if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount || request.isMandatoryTrim()) {
303             if (log.isDebugEnabled()) {
304                 if (dataSizeThresholdExceeded) {
305                     log.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit "
306                             + "with index {}", context.getId(), replLog.dataSize(), dataThreshold,
307                             request.getLastAppliedIndex());
308                 } else if (logSizeExceededSnapshotBatchCount) {
309                     log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with "
310                             + "index {}", context.getId(), replLog.size(), config.getSnapshotBatchCount(),
311                             request.getLastAppliedIndex());
312                 } else {
313                     log.debug("{}: user triggered or root overwrite snapshot encountered, trimming log up to "
314                             + "last applied index {}", context.getId(), request.getLastAppliedIndex());
315                 }
316             }
317
318             // We either exceeded the memory threshold or the log size exceeded the snapshot batch
319             // count so, to keep the log memory footprint in check, clear the log based on lastApplied.
320             // This could/should only happen if one of the followers is down as normally we keep
321             // removing from the log as entries are replicated to all.
322             replLog.snapshotPreCommit(request.getLastAppliedIndex(), request.getLastAppliedTerm());
323
324             // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
325             // install snapshot to a follower.
326             if (request.getReplicatedToAllIndex() >= 0) {
327                 currentBehavior.setReplicatedToAllIndex(request.getReplicatedToAllIndex());
328             }
329
330         } else if (request.getReplicatedToAllIndex() != -1) {
331             // clear the log based on replicatedToAllIndex
332             replLog.snapshotPreCommit(request.getReplicatedToAllIndex(),
333                 request.getReplicatedToAllTerm());
334
335             currentBehavior.setReplicatedToAllIndex(request.getReplicatedToAllIndex());
336         } else {
337             // The replicatedToAllIndex was not found in the log
338             // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
339             // In this scenario we may need to save the snapshot to the akka persistence
340             // snapshot for recovery but we do not need to do the replicated log trimming.
341             replLog.snapshotPreCommit(replLog.getSnapshotIndex(), replLog.getSnapshotTerm());
342         }
343
344         log.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} and term: {}",
345                 context.getId(), replLog.getSnapshotIndex(), replLog.getSnapshotTerm());
346
347         if (installSnapshotStream.isPresent()) {
348             // FIXME: ugly cast
349             final var snapshotStream = (FileBackedOutputStream) installSnapshotStream.orElseThrow();
350
351             if (context.getId().equals(currentBehavior.getLeaderId())) {
352                 try {
353                     ByteSource snapshotBytes = snapshotStream.asByteSource();
354                     currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot, snapshotBytes));
355                 } catch (IOException e) {
356                     log.error("{}: Snapshot install failed due to an unrecoverable streaming error",
357                             context.getId(), e);
358                 }
359             } else {
360                 snapshotStream.cleanup();
361             }
362         }
363
364         task = new PersistCapture(lastSeq);
365     }
366
367     @Override
368     public void commit(final long sequenceNumber, final long timeStamp) {
369         if (!(task instanceof Persist persist)) {
370             log.debug("commit should not be called in state {}", task);
371             return;
372         }
373
374         log.debug("{}: Snapshot success -  sequence number: {}", persistenceId(), sequenceNumber);
375         final var lastSequenceNumber = commit(persist);
376
377         final var persistence = context.getPersistenceProvider();
378         persistence.deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), timeStamp - 1, 0L, 0L));
379         persistence.deleteMessages(lastSequenceNumber);
380
381         snapshotComplete();
382     }
383
384     @SuppressWarnings("checkstyle:IllegalCatch")
385     private long commit(final Persist persist) {
386         return switch (persist) {
387             case PersistApply(var lastSeq, var snapshot, var callback) -> {
388                 try {
389                     // clears the followers log, sets the snapshot index to ensure adjusted-index works
390                     context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context));
391                     context.setLastApplied(snapshot.getLastAppliedIndex());
392                     context.setCommitIndex(snapshot.getLastAppliedIndex());
393                     // FIXME: This may be coming from the leader: we do not want to pollute our TermInfo if it is for
394                     //        this term: we may need to know who we voted for in the next elections.
395                     //        This behavior means we report as if we voted for the leader.
396                     context.setTermInfo(snapshot.termInfo());
397
398                     final var serverConfig = snapshot.getServerConfiguration();
399                     if (serverConfig != null) {
400                         context.updatePeerIds(serverConfig);
401                     }
402
403                     final var state = snapshot.getState();
404                     if (state != null && !(state instanceof EmptyState)) {
405                         snapshotCohort.applySnapshot(state);
406                     }
407
408                     if (callback != null) {
409                         callback.onSuccess();
410                     }
411                 } catch (Exception e) {
412                     log.error("{}: Error applying snapshot", context.getId(), e);
413                 }
414                 yield lastSeq;
415             }
416             case PersistCapture(final var lastSeq) -> {
417                 context.getReplicatedLog().snapshotCommit();
418                 yield lastSeq;
419             }
420         };
421     }
422
423     @Override
424     public void rollback() {
425         switch (task) {
426             case PersistApply persist -> {
427                 // Nothing to rollback if we're applying a snapshot from the leader.
428                 final var callback = persist.callback;
429                 if (callback != null)  {
430                     callback.onFailure();
431                 }
432                 snapshotComplete();
433             }
434             case PersistCapture persist -> {
435                 final var replLog = context.getReplicatedLog();
436                 replLog.snapshotRollback();
437                 log.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle. "
438                         + "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(), replLog.getSnapshotIndex(),
439                         replLog.getSnapshotTerm(), replLog.size());
440                 snapshotComplete();
441             }
442             default -> log.debug("rollback should not be called in state {}", task);
443         }
444     }
445
446     private void snapshotComplete() {
447         task = Idle.INSTANCE;
448         context.getActor().tell(SnapshotComplete.INSTANCE, context.getActor());
449     }
450
451     @Override
452     public long trimLog(final long desiredTrimIndex) {
453         if (!(task instanceof Idle)) {
454             log.debug("trimLog should not be called in state {}", task);
455             return -1;
456         }
457
458         //  we would want to keep the lastApplied as its used while capturing snapshots
459         long lastApplied = context.getLastApplied();
460         long tempMin = Math.min(desiredTrimIndex, lastApplied > -1 ? lastApplied - 1 : -1);
461
462         if (log.isTraceEnabled()) {
463             log.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}",
464                     persistenceId(), desiredTrimIndex, lastApplied, tempMin);
465         }
466
467         if (tempMin > -1) {
468             final var replLog = context.getReplicatedLog();
469             if (replLog.isPresent(tempMin)) {
470                 log.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
471                     context.currentTerm());
472
473                 //use the term of the temp-min, since we check for isPresent, entry will not be null
474                 final var entry = replLog.get(tempMin);
475                 replLog.snapshotPreCommit(tempMin, entry.term());
476                 replLog.snapshotCommit(false);
477                 return tempMin;
478             }
479         }
480
481         final var currentBehavior = context.getCurrentBehavior();
482         if (tempMin > currentBehavior.getReplicatedToAllIndex()) {
483             // It is possible a follower was lagging and an install snapshot advanced its match index past the current
484             // replicatedToAllIndex. Since the follower is now caught up we should advance the replicatedToAllIndex
485             // (to tempMin). The fact that tempMin was not found in the log is likely due to a previous snapshot
486             // triggered by the memory threshold exceeded, in that case we trim the log to the last applied index even
487             // if previous entries weren't replicated to all followers.
488             currentBehavior.setReplicatedToAllIndex(tempMin);
489         }
490         return -1;
491     }
492
493     @SuppressWarnings("checkstyle:hiddenField")
494     void setCreateSnapshotConsumer(final Consumer<Optional<OutputStream>> createSnapshotProcedure) {
495         this.createSnapshotProcedure = createSnapshotProcedure;
496     }
497
498     void setSnapshotCohort(final RaftActorSnapshotCohort snapshotCohort) {
499         this.snapshotCohort = snapshotCohort;
500     }
501
502     public Snapshot.@NonNull State convertSnapshot(final ByteSource snapshotBytes) throws IOException {
503         return snapshotCohort.deserializeSnapshot(snapshotBytes);
504     }
505
506     public long getLastSequenceNumber() {
507         return task instanceof Persist persist ? persist.lastSequenceNumber() : -1;
508     }
509
510     @VisibleForTesting
511     public @Nullable CaptureSnapshot getCaptureSnapshot() {
512         return task instanceof Capture capture ? capture.request : null;
513     }
514
515     private String persistenceId() {
516         return context.getId();
517     }
518
519     /**
520      * Constructs a CaptureSnapshot instance.
521      *
522      * @param lastLogEntry the last log entry for the snapshot.
523      * @param replicatedToAllIndex the index of the last entry replicated to all followers.
524      * @return a new CaptureSnapshot instance.
525      */
526     public @NonNull CaptureSnapshot newCaptureSnapshot(final RaftEntryMeta lastLogEntry,
527             final long replicatedToAllIndex, final boolean mandatoryTrim) {
528         final var replLog = context.getReplicatedLog();
529         final var lastAppliedEntry = computeLastAppliedEntry(lastLogEntry);
530
531         final var entry = replLog.get(replicatedToAllIndex);
532         final var replicatedToAllEntry = entry != null ? entry : ImmutableRaftEntryMeta.of(-1, -1);
533
534         long lastAppliedIndex = lastAppliedEntry.index();
535         long lastAppliedTerm = lastAppliedEntry.term();
536
537         final var unAppliedEntries = replLog.getFrom(lastAppliedIndex + 1);
538
539         final long lastLogEntryIndex;
540         final long lastLogEntryTerm;
541         if (lastLogEntry == null) {
542             // When we don't have journal present, for example two captureSnapshots executed right after another with no
543             // new journal we still want to preserve the index and term in the snapshot.
544             lastAppliedIndex = lastLogEntryIndex = replLog.getSnapshotIndex();
545             lastAppliedTerm = lastLogEntryTerm = replLog.getSnapshotTerm();
546
547             log.debug("{}: Capturing Snapshot : lastLogEntry is null. Using snapshot values lastAppliedIndex {} and "
548                     + "lastAppliedTerm {} instead.", persistenceId(), lastAppliedIndex, lastAppliedTerm);
549         } else {
550             lastLogEntryIndex = lastLogEntry.index();
551             lastLogEntryTerm = lastLogEntry.term();
552         }
553
554         return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
555             replicatedToAllEntry.index(), replicatedToAllEntry.term(), unAppliedEntries, mandatoryTrim);
556     }
557
558     @NonNullByDefault
559     private RaftEntryMeta computeLastAppliedEntry(final @Nullable RaftEntryMeta lastLogEntry) {
560         return computeLastAppliedEntry(context.getReplicatedLog(), context.getLastApplied(), lastLogEntry,
561             context.hasFollowers());
562     }
563
564     @VisibleForTesting
565     @NonNullByDefault
566     static RaftEntryMeta computeLastAppliedEntry(final ReplicatedLog log, final long originalIndex,
567             final @Nullable RaftEntryMeta lastLogEntry, final boolean hasFollowers) {
568         if (hasFollowers) {
569             final var entry = log.lookupMeta(originalIndex);
570             if (entry != null) {
571                 return entry;
572             }
573
574             final var snapshotIndex = log.getSnapshotIndex();
575             if (snapshotIndex > -1) {
576                 return ImmutableRaftEntryMeta.of(snapshotIndex, log.getSnapshotTerm());
577             }
578         } else if (lastLogEntry != null) {
579             // since we have persisted the last-log-entry to persistent journal before the capture, we would want
580             // to snapshot from this entry.
581             return lastLogEntry;
582         }
583
584         return ImmutableRaftEntryMeta.of(-1, -1);
585     }
586 }