Fix checkstyle reported by odlparent-3.0.0
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / SnapshotManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.persistence.SnapshotSelectionCriteria;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.io.ByteSource;
14 import java.io.IOException;
15 import java.io.OutputStream;
16 import java.util.List;
17 import java.util.Optional;
18 import java.util.function.Consumer;
19 import javax.annotation.Nonnull;
20 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
21 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
22 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
23 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
24 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
25 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
26 import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
27 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
28 import org.slf4j.Logger;
29
30 /**
31  * Manages the capturing of snapshots for a RaftActor.
32  *
33  * @author Moiz Raja
34  * @author Thomas Pantelis
35  */
36 public class SnapshotManager implements SnapshotState {
37
38     @SuppressWarnings("checkstyle:MemberName")
39     private final SnapshotState IDLE = new Idle();
40
41     @SuppressWarnings({"checkstyle:MemberName", "checkstyle:AbbreviationAsWordInName"})
42     private final SnapshotState PERSISTING = new Persisting();
43
44     @SuppressWarnings({"checkstyle:MemberName", "checkstyle:AbbreviationAsWordInName"})
45     private final SnapshotState CREATING = new Creating();
46
47     private final Logger log;
48     private final RaftActorContext context;
49     private final LastAppliedTermInformationReader lastAppliedTermInformationReader =
50             new LastAppliedTermInformationReader();
51     private final ReplicatedToAllTermInformationReader replicatedToAllTermInformationReader =
52             new ReplicatedToAllTermInformationReader();
53
54
55     private SnapshotState currentState = IDLE;
56     private CaptureSnapshot captureSnapshot;
57     private long lastSequenceNumber = -1;
58
59     private Consumer<Optional<OutputStream>> createSnapshotProcedure;
60
61     private ApplySnapshot applySnapshot;
62     private RaftActorSnapshotCohort snapshotCohort = NoopRaftActorSnapshotCohort.INSTANCE;
63
64     /**
65      * Constructs an instance.
66      *
67      * @param context the RaftActorContext
68      * @param logger the Logger
69      */
70     public SnapshotManager(final RaftActorContext context, final Logger logger) {
71         this.context = context;
72         this.log = logger;
73     }
74
75     public boolean isApplying() {
76         return applySnapshot != null;
77     }
78
79     @Override
80     public boolean isCapturing() {
81         return currentState.isCapturing();
82     }
83
84     @Override
85     public boolean captureToInstall(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex,
86             final String targetFollower) {
87         return currentState.captureToInstall(lastLogEntry, replicatedToAllIndex, targetFollower);
88     }
89
90     @Override
91     public boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) {
92         return currentState.capture(lastLogEntry, replicatedToAllIndex);
93     }
94
95     @Override
96     public void apply(final ApplySnapshot snapshot) {
97         currentState.apply(snapshot);
98     }
99
100     @Override
101     public void persist(final Snapshot.State state, final Optional<OutputStream> installSnapshotStream,
102             final long totalMemory) {
103         currentState.persist(state, installSnapshotStream, totalMemory);
104     }
105
106     @Override
107     public void commit(final long sequenceNumber, final long timeStamp) {
108         currentState.commit(sequenceNumber, timeStamp);
109     }
110
111     @Override
112     public void rollback() {
113         currentState.rollback();
114     }
115
116     @Override
117     public long trimLog(final long desiredTrimIndex) {
118         return currentState.trimLog(desiredTrimIndex);
119     }
120
121     @SuppressWarnings("checkstyle:hiddenField")
122     void setCreateSnapshotConsumer(final Consumer<Optional<OutputStream>> createSnapshotProcedure) {
123         this.createSnapshotProcedure = createSnapshotProcedure;
124     }
125
126     void setSnapshotCohort(final RaftActorSnapshotCohort snapshotCohort) {
127         this.snapshotCohort = snapshotCohort;
128     }
129
130     @Nonnull
131     public Snapshot.State convertSnapshot(final ByteSource snapshotBytes) throws IOException {
132         return snapshotCohort.deserializeSnapshot(snapshotBytes);
133     }
134
135     public long getLastSequenceNumber() {
136         return lastSequenceNumber;
137     }
138
139     @VisibleForTesting
140     public CaptureSnapshot getCaptureSnapshot() {
141         return captureSnapshot;
142     }
143
144     private boolean hasFollowers() {
145         return context.hasFollowers();
146     }
147
148     private String persistenceId() {
149         return context.getId();
150     }
151
152     /**
153      * Constructs a CaptureSnapshot instance.
154      *
155      * @param lastLogEntry the last log entry for the snapshot.
156      * @param replicatedToAllIndex the index of the last entry replicated to all followers.
157      * @return a new CaptureSnapshot instance.
158      */
159     public CaptureSnapshot newCaptureSnapshot(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) {
160         TermInformationReader lastAppliedTermInfoReader =
161                 lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
162                         lastLogEntry, hasFollowers());
163
164         long lastAppliedIndex = lastAppliedTermInfoReader.getIndex();
165         long lastAppliedTerm = lastAppliedTermInfoReader.getTerm();
166
167         TermInformationReader replicatedToAllTermInfoReader =
168                 replicatedToAllTermInformationReader.init(context.getReplicatedLog(), replicatedToAllIndex);
169
170         long newReplicatedToAllIndex = replicatedToAllTermInfoReader.getIndex();
171         long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
172
173         List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
174
175         long lastLogEntryIndex = lastAppliedIndex;
176         long lastLogEntryTerm = lastAppliedTerm;
177         if (lastLogEntry != null) {
178             lastLogEntryIndex = lastLogEntry.getIndex();
179             lastLogEntryTerm = lastLogEntry.getTerm();
180         } else {
181             log.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and "
182                     + "lastAppliedTerm {} instead.", persistenceId(), lastAppliedIndex, lastAppliedTerm);
183         }
184
185         return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
186                 newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries);
187     }
188
189     private class AbstractSnapshotState implements SnapshotState {
190
191         @Override
192         public boolean isCapturing() {
193             return true;
194         }
195
196         @Override
197         public boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) {
198             log.debug("capture should not be called in state {}", this);
199             return false;
200         }
201
202         @Override
203         public boolean captureToInstall(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex,
204                 final String targetFollower) {
205             log.debug("captureToInstall should not be called in state {}", this);
206             return false;
207         }
208
209         @Override
210         public void apply(final ApplySnapshot snapshot) {
211             log.debug("apply should not be called in state {}", this);
212         }
213
214         @Override
215         public void persist(final Snapshot.State state, final Optional<OutputStream> installSnapshotStream,
216                 final long totalMemory) {
217             log.debug("persist should not be called in state {}", this);
218         }
219
220         @Override
221         public void commit(final long sequenceNumber, final long timeStamp) {
222             log.debug("commit should not be called in state {}", this);
223         }
224
225         @Override
226         public void rollback() {
227             log.debug("rollback should not be called in state {}", this);
228         }
229
230         @Override
231         public long trimLog(final long desiredTrimIndex) {
232             log.debug("trimLog should not be called in state {}", this);
233             return -1;
234         }
235
236         protected long doTrimLog(final long desiredTrimIndex) {
237             //  we would want to keep the lastApplied as its used while capturing snapshots
238             long lastApplied = context.getLastApplied();
239             long tempMin = Math.min(desiredTrimIndex, lastApplied > -1 ? lastApplied - 1 : -1);
240
241             if (log.isTraceEnabled()) {
242                 log.trace("{}: performSnapshotWithoutCapture: desiredTrimIndex: {}, lastApplied: {}, tempMin: {}",
243                         persistenceId(), desiredTrimIndex, lastApplied, tempMin);
244             }
245
246             if (tempMin > -1 && context.getReplicatedLog().isPresent(tempMin)) {
247                 log.debug("{}: fakeSnapshot purging log to {} for term {}", persistenceId(), tempMin,
248                         context.getTermInformation().getCurrentTerm());
249
250                 //use the term of the temp-min, since we check for isPresent, entry will not be null
251                 ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
252                 context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
253                 context.getReplicatedLog().snapshotCommit();
254                 return tempMin;
255             }
256
257             final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
258             if (tempMin > currentBehavior.getReplicatedToAllIndex()) {
259                 // It's possible a follower was lagging and an install snapshot advanced its match index past
260                 // the current replicatedToAllIndex. Since the follower is now caught up we should advance the
261                 // replicatedToAllIndex (to tempMin). The fact that tempMin wasn't found in the log is likely
262                 // due to a previous snapshot triggered by the memory threshold exceeded, in that case we
263                 // trim the log to the last applied index even if previous entries weren't replicated to all followers.
264                 currentBehavior.setReplicatedToAllIndex(tempMin);
265             }
266             return -1;
267         }
268     }
269
270     private class Idle extends AbstractSnapshotState {
271
272         @Override
273         public boolean isCapturing() {
274             return false;
275         }
276
277         @SuppressWarnings("checkstyle:IllegalCatch")
278         private boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex,
279                 final String targetFollower) {
280             captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex);
281
282             OutputStream installSnapshotStream = null;
283             if (targetFollower != null) {
284                 installSnapshotStream = context.getFileBackedOutputStreamFactory().newInstance();
285                 log.info("{}: Initiating snapshot capture {} to install on {}",
286                         persistenceId(), captureSnapshot, targetFollower);
287             } else {
288                 log.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
289             }
290
291             lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
292
293             log.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSequenceNumber);
294
295             SnapshotManager.this.currentState = CREATING;
296
297             try {
298                 createSnapshotProcedure.accept(Optional.ofNullable(installSnapshotStream));
299             } catch (Exception e) {
300                 SnapshotManager.this.currentState = IDLE;
301                 log.error("Error creating snapshot", e);
302                 return false;
303             }
304
305             return true;
306         }
307
308         @Override
309         public boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) {
310             return capture(lastLogEntry, replicatedToAllIndex, null);
311         }
312
313         @Override
314         public boolean captureToInstall(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex,
315                 final String targetFollower) {
316             return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
317         }
318
319         @Override
320         public void apply(final ApplySnapshot toApply) {
321             SnapshotManager.this.applySnapshot = toApply;
322
323             lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
324
325             log.debug("lastSequenceNumber prior to persisting applied snapshot: {}", lastSequenceNumber);
326
327             context.getPersistenceProvider().saveSnapshot(toApply.getSnapshot());
328
329             SnapshotManager.this.currentState = PERSISTING;
330         }
331
332         @Override
333         public String toString() {
334             return "Idle";
335         }
336
337         @Override
338         public long trimLog(final long desiredTrimIndex) {
339             return doTrimLog(desiredTrimIndex);
340         }
341     }
342
343     private class Creating extends AbstractSnapshotState {
344
345         @Override
346         public void persist(final Snapshot.State snapshotState, final Optional<OutputStream> installSnapshotStream,
347                 final long totalMemory) {
348             // create a snapshot object from the state provided and save it
349             // when snapshot is saved async, SaveSnapshotSuccess is raised.
350
351             Snapshot snapshot = Snapshot.create(snapshotState,
352                     captureSnapshot.getUnAppliedEntries(),
353                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
354                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm(),
355                     context.getTermInformation().getCurrentTerm(),
356                     context.getTermInformation().getVotedFor(), context.getPeerServerInfo(true));
357
358             context.getPersistenceProvider().saveSnapshot(snapshot);
359
360             log.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot);
361
362             long dataThreshold = totalMemory * context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
363             boolean dataSizeThresholdExceeded = context.getReplicatedLog().dataSize() > dataThreshold;
364
365             boolean logSizeExceededSnapshotBatchCount =
366                     context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount();
367
368             final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
369             if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) {
370                 if (log.isDebugEnabled()) {
371                     if (dataSizeThresholdExceeded) {
372                         log.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit "
373                                 + "with index {}", context.getId(), context.getReplicatedLog().dataSize(),
374                                 dataThreshold, captureSnapshot.getLastAppliedIndex());
375                     } else {
376                         log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with "
377                                 + "index {}", context.getId(), context.getReplicatedLog().size(),
378                                 context.getConfigParams().getSnapshotBatchCount(),
379                                 captureSnapshot.getLastAppliedIndex());
380                     }
381                 }
382
383                 // We either exceeded the memory threshold or the log size exceeded the snapshot batch
384                 // count so, to keep the log memory footprint in check, clear the log based on lastApplied.
385                 // This could/should only happen if one of the followers is down as normally we keep
386                 // removing from the log as entries are replicated to all.
387                 context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
388                         captureSnapshot.getLastAppliedTerm());
389
390                 // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
391                 // install snapshot to a follower.
392                 if (captureSnapshot.getReplicatedToAllIndex() >= 0) {
393                     currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
394                 }
395
396             } else if (captureSnapshot.getReplicatedToAllIndex() != -1) {
397                 // clear the log based on replicatedToAllIndex
398                 context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
399                         captureSnapshot.getReplicatedToAllTerm());
400
401                 currentBehavior.setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
402             } else {
403                 // The replicatedToAllIndex was not found in the log
404                 // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
405                 // In this scenario we may need to save the snapshot to the akka persistence
406                 // snapshot for recovery but we do not need to do the replicated log trimming.
407                 context.getReplicatedLog().snapshotPreCommit(context.getReplicatedLog().getSnapshotIndex(),
408                         context.getReplicatedLog().getSnapshotTerm());
409             }
410
411             log.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} and term: {}",
412                     context.getId(), context.getReplicatedLog().getSnapshotIndex(),
413                     context.getReplicatedLog().getSnapshotTerm());
414
415             if (installSnapshotStream.isPresent()) {
416                 if (context.getId().equals(currentBehavior.getLeaderId())) {
417                     try {
418                         ByteSource snapshotBytes = ((FileBackedOutputStream)installSnapshotStream.get()).asByteSource();
419                         currentBehavior.handleMessage(context.getActor(),
420                                 new SendInstallSnapshot(snapshot, snapshotBytes));
421                     } catch (IOException e) {
422                         log.error("{}: Snapshot install failed due to an unrecoverable streaming error",
423                                 context.getId(), e);
424                     }
425                 } else {
426                     ((FileBackedOutputStream)installSnapshotStream.get()).cleanup();
427                 }
428             }
429
430             captureSnapshot = null;
431             SnapshotManager.this.currentState = PERSISTING;
432         }
433
434         @Override
435         public String toString() {
436             return "Creating";
437         }
438
439     }
440
441     private class Persisting extends AbstractSnapshotState {
442
443         @Override
444         @SuppressWarnings("checkstyle:IllegalCatch")
445         public void commit(final long sequenceNumber, final long timeStamp) {
446             log.debug("{}: Snapshot success -  sequence number: {}", persistenceId(), sequenceNumber);
447
448             if (applySnapshot != null) {
449                 try {
450                     Snapshot snapshot = applySnapshot.getSnapshot();
451
452                     //clears the followers log, sets the snapshot index to ensure adjusted-index works
453                     context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context));
454                     context.setLastApplied(snapshot.getLastAppliedIndex());
455                     context.setCommitIndex(snapshot.getLastAppliedIndex());
456                     context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
457
458                     if (snapshot.getServerConfiguration() != null) {
459                         context.updatePeerIds(snapshot.getServerConfiguration());
460                     }
461
462                     if (!(snapshot.getState() instanceof EmptyState)) {
463                         snapshotCohort.applySnapshot(snapshot.getState());
464                     }
465
466                     applySnapshot.getCallback().onSuccess();
467                 } catch (Exception e) {
468                     log.error("{}: Error applying snapshot", context.getId(), e);
469                 }
470             } else {
471                 context.getReplicatedLog().snapshotCommit();
472             }
473
474             context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(sequenceNumber,
475                     timeStamp - 1, 0L, 0L));
476
477             context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
478
479             snapshotComplete();
480         }
481
482         @Override
483         public void rollback() {
484             // Nothing to rollback if we're applying a snapshot from the leader.
485             if (applySnapshot == null) {
486                 context.getReplicatedLog().snapshotRollback();
487
488                 log.info("{}: Replicated Log rolled back. Snapshot will be attempted in the next cycle."
489                         + "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
490                         context.getReplicatedLog().getSnapshotIndex(),
491                         context.getReplicatedLog().getSnapshotTerm(),
492                         context.getReplicatedLog().size());
493             } else {
494                 applySnapshot.getCallback().onFailure();
495             }
496
497             snapshotComplete();
498         }
499
500         private void snapshotComplete() {
501             lastSequenceNumber = -1;
502             applySnapshot = null;
503             SnapshotManager.this.currentState = IDLE;
504
505             context.getActor().tell(SnapshotComplete.INSTANCE, context.getActor());
506         }
507
508         @Override
509         public String toString() {
510             return "Persisting";
511         }
512
513     }
514
515     private interface TermInformationReader {
516         long getIndex();
517
518         long getTerm();
519     }
520
521     static class LastAppliedTermInformationReader implements TermInformationReader {
522         private long index;
523         private long term;
524
525         LastAppliedTermInformationReader init(final ReplicatedLog log, final long originalIndex,
526                 final ReplicatedLogEntry lastLogEntry, final boolean hasFollowers) {
527             ReplicatedLogEntry entry = log.get(originalIndex);
528             this.index = -1L;
529             this.term = -1L;
530             if (!hasFollowers) {
531                 if (lastLogEntry != null) {
532                     // since we have persisted the last-log-entry to persistent journal before the capture,
533                     // we would want to snapshot from this entry.
534                     index = lastLogEntry.getIndex();
535                     term = lastLogEntry.getTerm();
536                 }
537             } else if (entry != null) {
538                 index = entry.getIndex();
539                 term = entry.getTerm();
540             } else if (log.getSnapshotIndex() > -1) {
541                 index = log.getSnapshotIndex();
542                 term = log.getSnapshotTerm();
543             }
544             return this;
545         }
546
547         @Override
548         public long getIndex() {
549             return this.index;
550         }
551
552         @Override
553         public long getTerm() {
554             return this.term;
555         }
556     }
557
558     private static class ReplicatedToAllTermInformationReader implements TermInformationReader {
559         private long index;
560         private long term;
561
562         ReplicatedToAllTermInformationReader init(final ReplicatedLog log, final long originalIndex) {
563             ReplicatedLogEntry entry = log.get(originalIndex);
564             this.index = -1L;
565             this.term = -1L;
566
567             if (entry != null) {
568                 index = entry.getIndex();
569                 term = entry.getTerm();
570             }
571
572             return this;
573         }
574
575         @Override
576         public long getIndex() {
577             return this.index;
578         }
579
580         @Override
581         public long getTerm() {
582             return this.term;
583         }
584     }
585 }