Allow incremental recovery
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorRecoverySupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import akka.persistence.RecoveryCompleted;
11 import akka.persistence.SnapshotOffer;
12 import com.google.common.base.Stopwatch;
13 import java.util.Collections;
14 import java.util.concurrent.TimeUnit;
15 import org.opendaylight.controller.cluster.PersistentDataProvider;
16 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
17 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
18 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
19 import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
20 import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
21 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
22 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
23 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
24 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
25 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.PersistentPayload;
26 import org.slf4j.Logger;
27
28 /**
29  * Support class that handles persistence recovery for a RaftActor.
30  *
31  * @author Thomas Pantelis
32  */
33 class RaftActorRecoverySupport {
34     private final RaftActorContext context;
35     private final RaftActorRecoveryCohort cohort;
36
37     private int currentRecoveryBatchCount;
38     private boolean dataRecoveredWithPersistenceDisabled;
39     private boolean anyDataRecovered;
40     private boolean hasMigratedDataRecovered;
41
42     private Stopwatch recoveryTimer;
43     private Stopwatch recoverySnapshotTimer;
44     private final Logger log;
45
46     RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
47         this.context = context;
48         this.cohort = cohort;
49         this.log = context.getLogger();
50     }
51
52     boolean handleRecoveryMessage(final Object message, final PersistentDataProvider persistentProvider) {
53         log.trace("{}: handleRecoveryMessage: {}", context.getId(), message);
54
55         anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
56
57         if (isMigratedSerializable(message)) {
58             hasMigratedDataRecovered = true;
59         }
60
61         boolean recoveryComplete = false;
62         if (message instanceof UpdateElectionTerm) {
63             context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
64                     ((UpdateElectionTerm) message).getVotedFor());
65         } else if (message instanceof SnapshotOffer) {
66             onRecoveredSnapshot((SnapshotOffer) message);
67         } else if (message instanceof ReplicatedLogEntry) {
68             onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
69         } else if (message instanceof ApplyJournalEntries) {
70             onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
71         } else if (message instanceof DeleteEntries) {
72             onDeleteEntries((DeleteEntries) message);
73         } else if (message instanceof ServerConfigurationPayload) {
74             context.updatePeerIds((ServerConfigurationPayload)message);
75         } else if (message instanceof RecoveryCompleted) {
76             recoveryComplete = true;
77             onRecoveryCompletedMessage(persistentProvider);
78         }
79
80         return recoveryComplete;
81     }
82
83     @SuppressWarnings("checkstyle:IllegalCatch")
84     private void possiblyRestoreFromSnapshot() {
85         Snapshot restoreFromSnapshot = cohort.getRestoreFromSnapshot();
86         if (restoreFromSnapshot == null) {
87             return;
88         }
89
90         if (anyDataRecovered) {
91             log.warn("{}: The provided restore snapshot was not applied because the persistence store is not empty",
92                     context.getId());
93             return;
94         }
95
96         log.debug("{}: Restore snapshot: {}", context.getId(), restoreFromSnapshot);
97
98         context.getSnapshotManager().apply(new ApplySnapshot(restoreFromSnapshot));
99     }
100
101     private ReplicatedLog replicatedLog() {
102         return context.getReplicatedLog();
103     }
104
105     private void initRecoveryTimers() {
106         if (recoveryTimer == null) {
107             recoveryTimer = Stopwatch.createStarted();
108         }
109         if (recoverySnapshotTimer == null && context.getConfigParams().getRecoverySnapshotIntervalSeconds() > 0) {
110             recoverySnapshotTimer = Stopwatch.createStarted();
111         }
112     }
113
114     private void onRecoveredSnapshot(final SnapshotOffer offer) {
115         log.debug("{}: SnapshotOffer called.", context.getId());
116
117         initRecoveryTimers();
118
119         Snapshot snapshot = (Snapshot) offer.snapshot();
120
121         for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
122             if (isMigratedPayload(entry)) {
123                 hasMigratedDataRecovered = true;
124             }
125         }
126
127         if (!context.getPersistenceProvider().isRecoveryApplicable()) {
128             // We may have just transitioned to disabled and have a snapshot containing state data and/or log
129             // entries - we don't want to preserve these, only the server config and election term info.
130
131             snapshot = Snapshot.create(
132                     EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
133                     snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration());
134         }
135
136         // Create a replicated log with the snapshot information
137         // The replicated log can be used later on to retrieve this snapshot
138         // when we need to install it on a peer
139
140         context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context));
141         context.setLastApplied(snapshot.getLastAppliedIndex());
142         context.setCommitIndex(snapshot.getLastAppliedIndex());
143         context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
144
145         final Stopwatch timer = Stopwatch.createStarted();
146
147         // Apply the snapshot to the actors state
148         final State snapshotState = snapshot.getState();
149         if (snapshotState.needsMigration()) {
150             hasMigratedDataRecovered = true;
151         }
152         if (!(snapshotState instanceof EmptyState)) {
153             cohort.applyRecoverySnapshot(snapshotState);
154         }
155
156         if (snapshot.getServerConfiguration() != null) {
157             context.updatePeerIds(snapshot.getServerConfiguration());
158         }
159
160         timer.stop();
161         log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
162                 context.getId(), timer, replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm(),
163                 replicatedLog().size());
164     }
165
166     private void onRecoveredJournalLogEntry(final ReplicatedLogEntry logEntry) {
167         if (log.isDebugEnabled()) {
168             log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(),
169                     logEntry.getIndex(), logEntry.size());
170         }
171
172         if (isServerConfigurationPayload(logEntry)) {
173             context.updatePeerIds((ServerConfigurationPayload)logEntry.getData());
174         }
175
176         if (isMigratedPayload(logEntry)) {
177             hasMigratedDataRecovered = true;
178         }
179
180         if (context.getPersistenceProvider().isRecoveryApplicable()) {
181             replicatedLog().append(logEntry);
182         } else if (!isPersistentPayload(logEntry)) {
183             dataRecoveredWithPersistenceDisabled = true;
184         }
185     }
186
187     private void onRecoveredApplyLogEntries(final long toIndex) {
188         if (!context.getPersistenceProvider().isRecoveryApplicable()) {
189             dataRecoveredWithPersistenceDisabled = true;
190             return;
191         }
192
193         long lastUnappliedIndex = context.getLastApplied() + 1;
194
195         if (log.isDebugEnabled()) {
196             // it can happen that lastUnappliedIndex > toIndex, if the AJE is in the persistent journal
197             // but the entry itself has made it to that state and recovered via the snapshot
198             log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}",
199                     context.getId(), lastUnappliedIndex, toIndex);
200         }
201
202         long lastApplied = lastUnappliedIndex - 1;
203         for (long i = lastUnappliedIndex; i <= toIndex; i++) {
204             ReplicatedLogEntry logEntry = replicatedLog().get(i);
205             if (logEntry != null) {
206                 lastApplied++;
207                 batchRecoveredLogEntry(logEntry);
208                 if (shouldTakeRecoverySnapshot() && !context.getSnapshotManager().isCapturing()) {
209                     if (currentRecoveryBatchCount > 0) {
210                         endCurrentLogRecoveryBatch();
211                     }
212                     context.setLastApplied(lastApplied);
213                     context.setCommitIndex(lastApplied);
214                     takeRecoverySnapshot(logEntry);
215                 }
216             } else {
217                 // Shouldn't happen but cover it anyway.
218                 log.error("{}: Log entry not found for index {}", context.getId(), i);
219                 break;
220             }
221         }
222
223         context.setLastApplied(lastApplied);
224         context.setCommitIndex(lastApplied);
225     }
226
227     private void onDeleteEntries(final DeleteEntries deleteEntries) {
228         if (context.getPersistenceProvider().isRecoveryApplicable()) {
229             replicatedLog().removeFrom(deleteEntries.getFromIndex());
230         } else {
231             dataRecoveredWithPersistenceDisabled = true;
232         }
233     }
234
235     private void batchRecoveredLogEntry(final ReplicatedLogEntry logEntry) {
236         initRecoveryTimers();
237
238         int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
239         if (!isServerConfigurationPayload(logEntry)) {
240             if (currentRecoveryBatchCount == 0) {
241                 cohort.startLogRecoveryBatch(batchSize);
242             }
243
244             cohort.appendRecoveredLogEntry(logEntry.getData());
245
246             if (++currentRecoveryBatchCount >= batchSize) {
247                 endCurrentLogRecoveryBatch();
248             }
249         }
250     }
251
252     private void takeRecoverySnapshot(final ReplicatedLogEntry logEntry) {
253         log.info("Time for recovery snapshot on entry with index {}", logEntry.getIndex());
254         final SnapshotManager snapshotManager = context.getSnapshotManager();
255         if (snapshotManager.capture(logEntry, -1)) {
256             log.info("Capturing snapshot, resetting timer for the next recovery snapshot interval.");
257             this.recoverySnapshotTimer.reset().start();
258         } else {
259             log.info("SnapshotManager is not able to capture snapshot at this time. It will be retried "
260                 + "again with the next recovered entry.");
261         }
262     }
263
264     private boolean shouldTakeRecoverySnapshot() {
265         return this.recoverySnapshotTimer != null && this.recoverySnapshotTimer.elapsed(TimeUnit.SECONDS)
266             >= context.getConfigParams().getRecoverySnapshotIntervalSeconds();
267     }
268
269     private void endCurrentLogRecoveryBatch() {
270         cohort.applyCurrentLogRecoveryBatch();
271         currentRecoveryBatchCount = 0;
272     }
273
274     private void onRecoveryCompletedMessage(final PersistentDataProvider persistentProvider) {
275         if (currentRecoveryBatchCount > 0) {
276             endCurrentLogRecoveryBatch();
277         }
278
279         final String recoveryTime;
280         if (recoveryTimer != null) {
281             recoveryTime = " in " + recoveryTimer.stop();
282             recoveryTimer = null;
283         } else {
284             recoveryTime = "";
285         }
286
287         if (recoverySnapshotTimer != null) {
288             recoverySnapshotTimer.stop();
289             recoverySnapshotTimer = null;
290         }
291
292         log.info("{}: Recovery completed {} - Switching actor to Follower - last log index = {}, last log term = {}, "
293                 + "snapshot index = {}, snapshot term = {}, journal size = {}", context.getId(), recoveryTime,
294                 replicatedLog().lastIndex(), replicatedLog().lastTerm(), replicatedLog().getSnapshotIndex(),
295                 replicatedLog().getSnapshotTerm(), replicatedLog().size());
296
297         if (dataRecoveredWithPersistenceDisabled
298                 || hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
299             if (hasMigratedDataRecovered) {
300                 log.info("{}: Saving snapshot after recovery due to migrated messages", context.getId());
301             } else {
302                 log.info("{}: Saving snapshot after recovery due to data persistence disabled", context.getId());
303             }
304
305             // Either data persistence is disabled and we recovered some data entries (ie we must have just
306             // transitioned to disabled or a persistence backup was restored) or we recovered migrated
307             // messages. Either way, we persist a snapshot and delete all the messages from the akka journal
308             // to clean out unwanted messages.
309
310             Snapshot snapshot = Snapshot.create(
311                     EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
312                     -1, -1, -1, -1,
313                     context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
314                     context.getPeerServerInfo(true));
315
316             persistentProvider.saveSnapshot(snapshot);
317
318             persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber());
319         } else if (hasMigratedDataRecovered) {
320             log.info("{}: Snapshot capture initiated after recovery due to migrated messages", context.getId());
321
322             context.getSnapshotManager().capture(replicatedLog().last(), -1);
323         } else {
324             possiblyRestoreFromSnapshot();
325         }
326     }
327
328     private static boolean isServerConfigurationPayload(final ReplicatedLogEntry repLogEntry) {
329         return repLogEntry.getData() instanceof ServerConfigurationPayload;
330     }
331
332     private static boolean isPersistentPayload(final ReplicatedLogEntry repLogEntry) {
333         return repLogEntry.getData() instanceof PersistentPayload;
334     }
335
336     private static boolean isMigratedPayload(final ReplicatedLogEntry repLogEntry) {
337         return isMigratedSerializable(repLogEntry.getData());
338     }
339
340     private static boolean isMigratedSerializable(final Object message) {
341         return message instanceof MigratedSerializable && ((MigratedSerializable)message).isMigrated();
342     }
343 }