2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft;
10 import akka.persistence.RecoveryCompleted;
11 import akka.persistence.SnapshotOffer;
12 import com.google.common.base.Stopwatch;
13 import java.util.Collections;
14 import java.util.concurrent.TimeUnit;
15 import org.opendaylight.controller.cluster.PersistentDataProvider;
16 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
17 import org.opendaylight.controller.cluster.raft.messages.PersistentPayload;
18 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
19 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
20 import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
21 import org.opendaylight.controller.cluster.raft.persisted.MigratedSerializable;
22 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
23 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
24 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
25 import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
26 import org.slf4j.Logger;
29 * Support class that handles persistence recovery for a RaftActor.
31 * @author Thomas Pantelis
33 class RaftActorRecoverySupport {
34 private final RaftActorContext context;
35 private final RaftActorRecoveryCohort cohort;
37 private int currentRecoveryBatchCount;
38 private boolean dataRecoveredWithPersistenceDisabled;
39 private boolean anyDataRecovered;
40 private boolean hasMigratedDataRecovered;
42 private Stopwatch recoveryTimer;
43 private Stopwatch recoverySnapshotTimer;
44 private final Logger log;
46 RaftActorRecoverySupport(final RaftActorContext context, final RaftActorRecoveryCohort cohort) {
47 this.context = context;
49 log = context.getLogger();
52 boolean handleRecoveryMessage(final Object message, final PersistentDataProvider persistentProvider) {
53 log.trace("{}: handleRecoveryMessage: {}", context.getId(), message);
55 anyDataRecovered = anyDataRecovered || !(message instanceof RecoveryCompleted);
57 if (isMigratedSerializable(message)) {
58 hasMigratedDataRecovered = true;
61 boolean recoveryComplete = false;
62 if (message instanceof UpdateElectionTerm updateElectionTerm) {
63 context.getTermInformation().update(updateElectionTerm.getCurrentTerm(), updateElectionTerm.getVotedFor());
64 } else if (message instanceof SnapshotOffer snapshotOffer) {
65 onRecoveredSnapshot(snapshotOffer);
66 } else if (message instanceof ReplicatedLogEntry replicatedLogEntry) {
67 onRecoveredJournalLogEntry(replicatedLogEntry);
68 } else if (message instanceof ApplyJournalEntries applyJournalEntries) {
69 onRecoveredApplyLogEntries(applyJournalEntries.getToIndex());
70 } else if (message instanceof DeleteEntries deleteEntries) {
71 onDeleteEntries(deleteEntries);
72 } else if (message instanceof ServerConfigurationPayload serverConfigurationPayload) {
73 context.updatePeerIds(serverConfigurationPayload);
74 } else if (message instanceof RecoveryCompleted) {
75 recoveryComplete = true;
76 onRecoveryCompletedMessage(persistentProvider);
79 return recoveryComplete;
82 @SuppressWarnings("checkstyle:IllegalCatch")
83 private void possiblyRestoreFromSnapshot() {
84 Snapshot restoreFromSnapshot = cohort.getRestoreFromSnapshot();
85 if (restoreFromSnapshot == null) {
89 if (anyDataRecovered) {
90 log.warn("{}: The provided restore snapshot was not applied because the persistence store is not empty",
95 log.debug("{}: Restore snapshot: {}", context.getId(), restoreFromSnapshot);
97 context.getSnapshotManager().apply(new ApplySnapshot(restoreFromSnapshot));
100 private ReplicatedLog replicatedLog() {
101 return context.getReplicatedLog();
104 private void initRecoveryTimers() {
105 if (recoveryTimer == null) {
106 recoveryTimer = Stopwatch.createStarted();
108 if (recoverySnapshotTimer == null && context.getConfigParams().getRecoverySnapshotIntervalSeconds() > 0) {
109 recoverySnapshotTimer = Stopwatch.createStarted();
113 private void onRecoveredSnapshot(final SnapshotOffer offer) {
114 log.debug("{}: SnapshotOffer called.", context.getId());
116 initRecoveryTimers();
118 Snapshot snapshot = (Snapshot) offer.snapshot();
120 for (ReplicatedLogEntry entry: snapshot.getUnAppliedEntries()) {
121 if (isMigratedPayload(entry)) {
122 hasMigratedDataRecovered = true;
126 if (!context.getPersistenceProvider().isRecoveryApplicable()) {
127 // We may have just transitioned to disabled and have a snapshot containing state data and/or log
128 // entries - we don't want to preserve these, only the server config and election term info.
130 snapshot = Snapshot.create(
131 EmptyState.INSTANCE, Collections.emptyList(), -1, -1, -1, -1,
132 snapshot.getElectionTerm(), snapshot.getElectionVotedFor(), snapshot.getServerConfiguration());
135 // Create a replicated log with the snapshot information
136 // The replicated log can be used later on to retrieve this snapshot
137 // when we need to install it on a peer
139 context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context));
140 context.setLastApplied(snapshot.getLastAppliedIndex());
141 context.setCommitIndex(snapshot.getLastAppliedIndex());
142 context.getTermInformation().update(snapshot.getElectionTerm(), snapshot.getElectionVotedFor());
144 final Stopwatch timer = Stopwatch.createStarted();
146 // Apply the snapshot to the actors state
147 final State snapshotState = snapshot.getState();
148 if (snapshotState.needsMigration()) {
149 hasMigratedDataRecovered = true;
151 if (!(snapshotState instanceof EmptyState)) {
152 cohort.applyRecoverySnapshot(snapshotState);
155 if (snapshot.getServerConfiguration() != null) {
156 context.updatePeerIds(snapshot.getServerConfiguration());
160 log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
161 context.getId(), timer, replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm(),
162 replicatedLog().size());
165 private void onRecoveredJournalLogEntry(final ReplicatedLogEntry logEntry) {
166 if (log.isDebugEnabled()) {
167 log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(),
168 logEntry.getIndex(), logEntry.size());
171 if (isServerConfigurationPayload(logEntry)) {
172 context.updatePeerIds((ServerConfigurationPayload)logEntry.getData());
175 if (isMigratedPayload(logEntry)) {
176 hasMigratedDataRecovered = true;
179 if (context.getPersistenceProvider().isRecoveryApplicable()) {
180 replicatedLog().append(logEntry);
181 } else if (!isPersistentPayload(logEntry)) {
182 dataRecoveredWithPersistenceDisabled = true;
186 private void onRecoveredApplyLogEntries(final long toIndex) {
187 if (!context.getPersistenceProvider().isRecoveryApplicable()) {
188 dataRecoveredWithPersistenceDisabled = true;
192 long lastUnappliedIndex = context.getLastApplied() + 1;
194 if (log.isDebugEnabled()) {
195 // it can happen that lastUnappliedIndex > toIndex, if the AJE is in the persistent journal
196 // but the entry itself has made it to that state and recovered via the snapshot
197 log.debug("{}: Received apply journal entries for recovery, applying to state: {} to {}",
198 context.getId(), lastUnappliedIndex, toIndex);
201 long lastApplied = lastUnappliedIndex - 1;
202 for (long i = lastUnappliedIndex; i <= toIndex; i++) {
203 ReplicatedLogEntry logEntry = replicatedLog().get(i);
204 if (logEntry != null) {
206 batchRecoveredLogEntry(logEntry);
207 if (shouldTakeRecoverySnapshot() && !context.getSnapshotManager().isCapturing()) {
208 if (currentRecoveryBatchCount > 0) {
209 endCurrentLogRecoveryBatch();
211 context.setLastApplied(lastApplied);
212 context.setCommitIndex(lastApplied);
213 takeRecoverySnapshot(logEntry);
216 // Shouldn't happen but cover it anyway.
217 log.error("{}: Log entry not found for index {}", context.getId(), i);
222 context.setLastApplied(lastApplied);
223 context.setCommitIndex(lastApplied);
226 private void onDeleteEntries(final DeleteEntries deleteEntries) {
227 if (context.getPersistenceProvider().isRecoveryApplicable()) {
228 replicatedLog().removeFrom(deleteEntries.getFromIndex());
230 dataRecoveredWithPersistenceDisabled = true;
234 private void batchRecoveredLogEntry(final ReplicatedLogEntry logEntry) {
235 initRecoveryTimers();
237 int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
238 if (!isServerConfigurationPayload(logEntry)) {
239 if (currentRecoveryBatchCount == 0) {
240 cohort.startLogRecoveryBatch(batchSize);
243 cohort.appendRecoveredLogEntry(logEntry.getData());
245 if (++currentRecoveryBatchCount >= batchSize) {
246 endCurrentLogRecoveryBatch();
251 private void takeRecoverySnapshot(final ReplicatedLogEntry logEntry) {
252 log.info("Time for recovery snapshot on entry with index {}", logEntry.getIndex());
253 final SnapshotManager snapshotManager = context.getSnapshotManager();
254 if (snapshotManager.capture(logEntry, -1)) {
255 log.info("Capturing snapshot, resetting timer for the next recovery snapshot interval.");
256 recoverySnapshotTimer.reset().start();
258 log.info("SnapshotManager is not able to capture snapshot at this time. It will be retried "
259 + "again with the next recovered entry.");
263 private boolean shouldTakeRecoverySnapshot() {
264 return recoverySnapshotTimer != null && recoverySnapshotTimer.elapsed(TimeUnit.SECONDS)
265 >= context.getConfigParams().getRecoverySnapshotIntervalSeconds();
268 private void endCurrentLogRecoveryBatch() {
269 cohort.applyCurrentLogRecoveryBatch();
270 currentRecoveryBatchCount = 0;
273 private void onRecoveryCompletedMessage(final PersistentDataProvider persistentProvider) {
274 if (currentRecoveryBatchCount > 0) {
275 endCurrentLogRecoveryBatch();
278 final String recoveryTime;
279 if (recoveryTimer != null) {
280 recoveryTime = " in " + recoveryTimer.stop();
281 recoveryTimer = null;
286 if (recoverySnapshotTimer != null) {
287 recoverySnapshotTimer.stop();
288 recoverySnapshotTimer = null;
291 log.info("{}: Recovery completed {} - Switching actor to Follower - last log index = {}, last log term = {}, "
292 + "snapshot index = {}, snapshot term = {}, journal size = {}", context.getId(), recoveryTime,
293 replicatedLog().lastIndex(), replicatedLog().lastTerm(), replicatedLog().getSnapshotIndex(),
294 replicatedLog().getSnapshotTerm(), replicatedLog().size());
296 if (dataRecoveredWithPersistenceDisabled
297 || hasMigratedDataRecovered && !context.getPersistenceProvider().isRecoveryApplicable()) {
298 if (hasMigratedDataRecovered) {
299 log.info("{}: Saving snapshot after recovery due to migrated messages", context.getId());
301 log.info("{}: Saving snapshot after recovery due to data persistence disabled", context.getId());
304 // Either data persistence is disabled and we recovered some data entries (ie we must have just
305 // transitioned to disabled or a persistence backup was restored) or we recovered migrated
306 // messages. Either way, we persist a snapshot and delete all the messages from the akka journal
307 // to clean out unwanted messages.
309 Snapshot snapshot = Snapshot.create(
310 EmptyState.INSTANCE, Collections.<ReplicatedLogEntry>emptyList(),
312 context.getTermInformation().getCurrentTerm(), context.getTermInformation().getVotedFor(),
313 context.getPeerServerInfo(true));
315 persistentProvider.saveSnapshot(snapshot);
317 persistentProvider.deleteMessages(persistentProvider.getLastSequenceNumber());
318 } else if (hasMigratedDataRecovered) {
319 log.info("{}: Snapshot capture initiated after recovery due to migrated messages", context.getId());
321 context.getSnapshotManager().capture(replicatedLog().last(), -1);
323 possiblyRestoreFromSnapshot();
327 private static boolean isServerConfigurationPayload(final ReplicatedLogEntry repLogEntry) {
328 return repLogEntry.getData() instanceof ServerConfigurationPayload;
331 private static boolean isPersistentPayload(final ReplicatedLogEntry repLogEntry) {
332 return repLogEntry.getData() instanceof PersistentPayload;
335 private static boolean isMigratedPayload(final ReplicatedLogEntry repLogEntry) {
336 return isMigratedSerializable(repLogEntry.getData());
339 private static boolean isMigratedSerializable(final Object message) {
340 return message instanceof MigratedSerializable migrated && migrated.isMigrated();