Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.persistence.RecoveryCompleted;
11 import akka.persistence.SnapshotOffer;
12 import com.google.common.base.Stopwatch;
13 import java.util.Collections;
14 import java.util.concurrent.TimeUnit;
15 import org.opendaylight.controller.cluster.PersistentDataProvider;
16 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
17 import org.opendaylight.controller.cluster.raft.messages.PersistentPayload;
18 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
19 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
20 import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
21 import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
22 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
23 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
24 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
25 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
26 import org.slf4j.Logger;
27
28 /**
29  * Support class that handles persistence recovery for a RaftActor.
30  *
31  * @author Thomas Pantelis
32  */
33 class RaftActorRecoverySupport {
34     private final RaftActorContext context;
35     private final RaftActorRecoveryCohort cohort;
36
37     private int currentRecoveryBatchCount;
38     private boolean dataRecoveredWithPersistenceDisabled;
39     private boolean anyDataRecovered;
40     private boolean hasMigratedDataRecovered;
41
42     private Stopwatch recoveryTimer;
43     private Stopwatch recoverySnapshotTimer;
44     private final Logger log;
45
46     RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
47         this.context = context;
48         this.cohort = cohort;
49         log = context.getLogger();
50     }
51
52     boolean handleRecoveryMessage(final Object message, final PersistentDataProvider persistentProvider) {
53         log.trace("{}: handleRecoveryMessage: {}", context.getId(), message);
54
55         anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
56
57         if (isMigratedSerializable(message)) {
58             hasMigratedDataRecovered = true;
59         }
60
61         boolean recoveryComplete = false;
62         if (message instanceof UpdateElectionTerm updateElectionTerm) {
63             context.getTermInformation().update(updateElectionTerm.getCurrentTerm(), updateElectionTerm.getVotedFor());
64         } else if (message instanceof SnapshotOffer snapshotOffer) {
65             onRecoveredSnapshot(snapshotOffer);
66         } else if (message instanceof ReplicatedLogEntry replicatedLogEntry) {
67             onRecoveredJournalLogEntry(replicatedLogEntry);
68         } else if (message instanceof ApplyJournalEntries applyJournalEntries) {
69             onRecoveredApplyLogEntries(applyJournalEntries.getToIndex());
70         } else if (message instanceof DeleteEntries deleteEntries) {
71             onDeleteEntries(deleteEntries);
72         } else if (message instanceof ServerConfigurationPayload serverConfigurationPayload) {
73             context.updatePeerIds(serverConfigurationPayload);
74         } else if (message instanceof RecoveryCompleted) {
75             recoveryComplete = true;
76             onRecoveryCompletedMessage(persistentProvider);
77         }
78
79         return recoveryComplete;
80     }
81
82     @SuppressWarnings("checkstyle:IllegalCatch")
83     private void possiblyRestoreFromSnapshot() {
84         Snapshot restoreFromSnapshot = cohort.getRestoreFromSnapshot();
85         if (restoreFromSnapshot == null) {
86             return;
87         }
88
89         if (anyDataRecovered) {
90             log.warn("{}: The provided restore snapshot was not applied because the persistence store is not empty",
91                     context.getId());
92             return;
93         }
94
95         log.debug("{}: Restore snapshot: {}", context.getId(), restoreFromSnapshot);
96
97         context.getSnapshotManager().apply(new ApplySnapshot(restoreFromSnapshot));
98     }
99
100     private ReplicatedLog replicatedLog() {
101         return context.getReplicatedLog();
102     }
103
104     private void initRecoveryTimers() {
105         if (recoveryTimer == null) {
106             recoveryTimer = Stopwatch.createStarted();
107         }
108         if (recoverySnapshotTimer == null && context.getConfigParams().getRecoverySnapshotIntervalSeconds() > 0) {
109             recoverySnapshotTimer = Stopwatch.createStarted();
110         }
111     }
112
113     private void onRecoveredSnapshot(final SnapshotOffer offer) {
114         log.debug("{}: SnapshotOffer called.", context.getId());
115
116         initRecoveryTimers();
117
118         Snapshot snapshot = (Snapshot) offer.snapshot();
119
120         for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
121             if (isMigratedPayload(entry)) {
122                 hasMigratedDataRecovered = true;
123             }
124         }
125
126         if (!context.getPersistenceProvider().isRecoveryApplicable()) {
127             // We may have just transitioned to disabled and have a snapshot containing state data and/or log
128             // entries - we don't want to preserve these, only the server config and election term info.
129
130             snapshot = Snapshot.create(
131                     EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
132                     snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration());
133         }
134
135         // Create a replicated log with the snapshot information
136         // The replicated log can be used later on to retrieve this snapshot
137         // when we need to install it on a peer
138
139         context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context));
140         context.setLastApplied(snapshot.getLastAppliedIndex());
141         context.setCommitIndex(snapshot.getLastAppliedIndex());
142         context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
143
144         final Stopwatch timer = Stopwatch.createStarted();
145
146         // Apply the snapshot to the actors state
147         final State snapshotState = snapshot.getState();
148         if (snapshotState.needsMigration()) {
149             hasMigratedDataRecovered = true;
150         }
151         if (!(snapshotState instanceof EmptyState)) {
152             cohort.applyRecoverySnapshot(snapshotState);
153         }
154
155         if (snapshot.getServerConfiguration() != null) {
156             context.updatePeerIds(snapshot.getServerConfiguration());
157         }
158
159         timer.stop();
160         log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
161                 context.getId(), timer, replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm(),
162                 replicatedLog().size());
163     }
164
165     private void onRecoveredJournalLogEntry(final ReplicatedLogEntry logEntry) {
166         if (log.isDebugEnabled()) {
167             log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(),
168                     logEntry.getIndex(), logEntry.size());
169         }
170
171         if (isServerConfigurationPayload(logEntry)) {
172             context.updatePeerIds((ServerConfigurationPayload)logEntry.getData());
173         }
174
175         if (isMigratedPayload(logEntry)) {
176             hasMigratedDataRecovered = true;
177         }
178
179         if (context.getPersistenceProvider().isRecoveryApplicable()) {
180             replicatedLog().append(logEntry);
181         } else if (!isPersistentPayload(logEntry)) {
182             dataRecoveredWithPersistenceDisabled = true;
183         }
184     }
185
186     private void onRecoveredApplyLogEntries(final long toIndex) {
187         if (!context.getPersistenceProvider().isRecoveryApplicable()) {
188             dataRecoveredWithPersistenceDisabled = true;
189             return;
190         }
191
192         long lastUnappliedIndex = context.getLastApplied() + 1;
193
194         if (log.isDebugEnabled()) {
195             // it can happen that lastUnappliedIndex > toIndex, if the AJE is in the persistent journal
196             // but the entry itself has made it to that state and recovered via the snapshot
197             log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}",
198                     context.getId(), lastUnappliedIndex, toIndex);
199         }
200
201         long lastApplied = lastUnappliedIndex - 1;
202         for (long i = lastUnappliedIndex; i <= toIndex; i++) {
203             ReplicatedLogEntry logEntry = replicatedLog().get(i);
204             if (logEntry != null) {
205                 lastApplied++;
206                 batchRecoveredLogEntry(logEntry);
207                 if (shouldTakeRecoverySnapshot() && !context.getSnapshotManager().isCapturing()) {
208                     if (currentRecoveryBatchCount > 0) {
209                         endCurrentLogRecoveryBatch();
210                     }
211                     context.setLastApplied(lastApplied);
212                     context.setCommitIndex(lastApplied);
213                     takeRecoverySnapshot(logEntry);
214                 }
215             } else {
216                 // Shouldn't happen but cover it anyway.
217                 log.error("{}: Log entry not found for index {}", context.getId(), i);
218                 break;
219             }
220         }
221
222         context.setLastApplied(lastApplied);
223         context.setCommitIndex(lastApplied);
224     }
225
226     private void onDeleteEntries(final DeleteEntries deleteEntries) {
227         if (context.getPersistenceProvider().isRecoveryApplicable()) {
228             replicatedLog().removeFrom(deleteEntries.getFromIndex());
229         } else {
230             dataRecoveredWithPersistenceDisabled = true;
231         }
232     }
233
234     private void batchRecoveredLogEntry(final ReplicatedLogEntry logEntry) {
235         initRecoveryTimers();
236
237         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
238         if (!isServerConfigurationPayload(logEntry)) {
239             if (currentRecoveryBatchCount == 0) {
240                 cohort.startLogRecoveryBatch(batchSize);
241             }
242
243             cohort.appendRecoveredLogEntry(logEntry.getData());
244
245             if (++currentRecoveryBatchCount >= batchSize) {
246                 endCurrentLogRecoveryBatch();
247             }
248         }
249     }
250
251     private void takeRecoverySnapshot(final ReplicatedLogEntry logEntry) {
252         log.info("Time for recovery snapshot on entry with index {}", logEntry.getIndex());
253         final SnapshotManager snapshotManager = context.getSnapshotManager();
254         if (snapshotManager.capture(logEntry, -1)) {
255             log.info("Capturing snapshot, resetting timer for the next recovery snapshot interval.");
256             recoverySnapshotTimer.reset().start();
257         } else {
258             log.info("SnapshotManager is not able to capture snapshot at this time. It will be retried "
259                 + "again with the next recovered entry.");
260         }
261     }
262
263     private boolean shouldTakeRecoverySnapshot() {
264         return recoverySnapshotTimer != null && recoverySnapshotTimer.elapsed(TimeUnit.SECONDS)
265             >= context.getConfigParams().getRecoverySnapshotIntervalSeconds();
266     }
267
268     private void endCurrentLogRecoveryBatch() {
269         cohort.applyCurrentLogRecoveryBatch();
270         currentRecoveryBatchCount = 0;
271     }
272
273     private void onRecoveryCompletedMessage(final PersistentDataProvider persistentProvider) {
274         if (currentRecoveryBatchCount > 0) {
275             endCurrentLogRecoveryBatch();
276         }
277
278         final String recoveryTime;
279         if (recoveryTimer != null) {
280             recoveryTime = " in " + recoveryTimer.stop();
281             recoveryTimer = null;
282         } else {
283             recoveryTime = "";
284         }
285
286         if (recoverySnapshotTimer != null) {
287             recoverySnapshotTimer.stop();
288             recoverySnapshotTimer = null;
289         }
290
291         log.info("{}: Recovery completed {} - Switching actor to Follower - last log index = {}, last log term = {}, "
292                 + "snapshot index = {}, snapshot term = {}, journal size = {}", context.getId(), recoveryTime,
293                 replicatedLog().lastIndex(), replicatedLog().lastTerm(), replicatedLog().getSnapshotIndex(),
294                 replicatedLog().getSnapshotTerm(), replicatedLog().size());
295
296         if (dataRecoveredWithPersistenceDisabled
297                 || hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
298             if (hasMigratedDataRecovered) {
299                 log.info("{}: Saving snapshot after recovery due to migrated messages", context.getId());
300             } else {
301                 log.info("{}: Saving snapshot after recovery due to data persistence disabled", context.getId());
302             }
303
304             // Either data persistence is disabled and we recovered some data entries (ie we must have just
305             // transitioned to disabled or a persistence backup was restored) or we recovered migrated
306             // messages. Either way, we persist a snapshot and delete all the messages from the akka journal
307             // to clean out unwanted messages.
308
309             Snapshot snapshot = Snapshot.create(
310                     EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
311                     -1, -1, -1, -1,
312                     context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
313                     context.getPeerServerInfo(true));
314
315             persistentProvider.saveSnapshot(snapshot);
316
317             persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber());
318         } else if (hasMigratedDataRecovered) {
319             log.info("{}: Snapshot capture initiated after recovery due to migrated messages", context.getId());
320
321             context.getSnapshotManager().capture(replicatedLog().last(), -1);
322         } else {
323             possiblyRestoreFromSnapshot();
324         }
325     }
326
327     private static boolean isServerConfigurationPayload(final ReplicatedLogEntry repLogEntry) {
328         return repLogEntry.getData() instanceof ServerConfigurationPayload;
329     }
330
331     private static boolean isPersistentPayload(final ReplicatedLogEntry repLogEntry) {
332         return repLogEntry.getData() instanceof PersistentPayload;
333     }
334
335     private static boolean isMigratedPayload(final ReplicatedLogEntry repLogEntry) {
336         return isMigratedSerializable(repLogEntry.getData());
337     }
338
339     private static boolean isMigratedSerializable(final Object message) {
340         return message instanceof MigratedSerializable migrated && migrated.isMigrated();
341     }
342 }