import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.japi.Procedure;
-import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
-import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
- private Stopwatch recoveryTimer;
-
- private int currentRecoveryBatchCount;
+ private RaftActorRecoverySupport raftRecovery;
private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior));
}
- private void initRecoveryTimer() {
- if(recoveryTimer == null) {
- recoveryTimer = Stopwatch.createStarted();
- }
- }
-
@Override
public void preStart() throws Exception {
LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
@Override
public void handleRecover(Object message) {
- if(persistence().isRecoveryApplicable()) {
- if (message instanceof SnapshotOffer) {
- onRecoveredSnapshot((SnapshotOffer) message);
- } else if (message instanceof ReplicatedLogEntry) {
- onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
- } else if (message instanceof ApplyLogEntries) {
- // Handle this message for backwards compatibility with pre-Lithium versions.
- onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
- } else if (message instanceof ApplyJournalEntries) {
- onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
- } else if (message instanceof DeleteEntries) {
- replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
- } else if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
- ((UpdateElectionTerm) message).getVotedFor());
- } else if (message instanceof RecoveryCompleted) {
- onRecoveryCompletedMessage();
- }
- } else {
- if (message instanceof RecoveryCompleted) {
+ if(raftRecovery == null) {
+ raftRecovery = new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior,
+ getRaftActorRecoveryCohort());
+ }
+
+ boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message);
+ if(recoveryComplete) {
+ if(!persistence().isRecoveryApplicable()) {
// Delete all the messages from the akka journal so that we do not end up with consistency issues
// Note I am not using the dataPersistenceProvider and directly using the akka api here
deleteMessages(lastSequenceNr());
// Delete all the akka snapshots as they will not be needed
deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
-
- onRecoveryComplete();
-
- initializeBehavior();
}
- }
- }
-
- private void onRecoveredSnapshot(SnapshotOffer offer) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: SnapshotOffer called..", persistenceId());
- }
- initRecoveryTimer();
+ onRecoveryComplete();
- Snapshot snapshot = (Snapshot) offer.snapshot();
+ initializeBehavior();
- // Create a replicated log with the snapshot information
- // The replicated log can be used later on to retrieve this snapshot
- // when we need to install it on a peer
-
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
- currentBehavior));
- context.setLastApplied(snapshot.getLastAppliedIndex());
- context.setCommitIndex(snapshot.getLastAppliedIndex());
-
- Stopwatch timer = Stopwatch.createStarted();
-
- // Apply the snapshot to the actors state
- applyRecoverySnapshot(snapshot.getState());
-
- timer.stop();
- LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
- replicatedLog().size(), persistenceId(), timer.toString(),
- replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm());
- }
-
- private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
+ raftRecovery = null;
}
-
- replicatedLog().append(logEntry);
- }
-
- private void onRecoveredApplyLogEntries(long toIndex) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
- persistenceId(), context.getLastApplied() + 1, toIndex);
- }
-
- for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
- batchRecoveredLogEntry(replicatedLog().get(i));
- }
-
- context.setLastApplied(toIndex);
- context.setCommitIndex(toIndex);
- }
-
- private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
- initRecoveryTimer();
-
- int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
- if(currentRecoveryBatchCount == 0) {
- startLogRecoveryBatch(batchSize);
- }
-
- appendRecoveredLogEntry(logEntry.getData());
-
- if(++currentRecoveryBatchCount >= batchSize) {
- endCurrentLogRecoveryBatch();
- }
- }
-
- private void endCurrentLogRecoveryBatch() {
- applyCurrentLogRecoveryBatch();
- currentRecoveryBatchCount = 0;
- }
-
- private void onRecoveryCompletedMessage() {
- if(currentRecoveryBatchCount > 0) {
- endCurrentLogRecoveryBatch();
- }
-
- onRecoveryComplete();
-
- String recoveryTime = "";
- if(recoveryTimer != null) {
- recoveryTimer.stop();
- recoveryTime = " in " + recoveryTimer.toString();
- recoveryTimer = null;
- }
-
- LOG.info(
- "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
- "Persistence Id = " + persistenceId() +
- " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
- "journal-size={}",
- replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
- replicatedLog().getSnapshotTerm(), replicatedLog().size());
-
- initializeBehavior();
}
protected void initializeBehavior(){
Object data);
/**
- * This method is called during recovery at the start of a batch of state entries. Derived
- * classes should perform any initialization needed to start a batch.
- */
- protected abstract void startLogRecoveryBatch(int maxBatchSize);
-
- /**
- * This method is called during recovery to append state data to the current batch. This method
- * is called 1 or more times after {@link #startLogRecoveryBatch}.
- *
- * @param data the state data
- */
- protected abstract void appendRecoveredLogEntry(Payload data);
-
- /**
- * This method is called during recovery to reconstruct the state of the actor.
- *
- * @param snapshotBytes A snapshot of the state of the actor
- */
- protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
-
- /**
- * This method is called during recovery at the end of a batch to apply the current batched
- * log entries. This method is called after {@link #appendRecoveredLogEntry}.
+ * Returns the RaftActorRecoveryCohort to participate in persistence recovery.
*/
- protected abstract void applyCurrentLogRecoveryBatch();
+ @Nonnull
+ protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort();
/**
* This method is called when recovery is complete.