import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.japi.Procedure;
-import akka.persistence.RecoveryCompleted;
-import akka.persistence.SaveSnapshotFailure;
-import akka.persistence.SaveSnapshotSuccess;
-import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
- private static final String COMMIT_SNAPSHOT = "commit_snapshot";
-
protected final Logger LOG = LoggerFactory.getLogger(getClass());
/**
private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
- private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
-
- private Stopwatch recoveryTimer;
+ private RaftActorRecoverySupport raftRecovery;
- private int currentRecoveryBatchCount;
+ private RaftActorSnapshotMessageSupport snapshotSupport;
private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior));
}
- private void initRecoveryTimer() {
- if(recoveryTimer == null) {
- recoveryTimer = Stopwatch.createStarted();
- }
- }
-
@Override
public void preStart() throws Exception {
LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
@Override
public void postStop() {
- if(currentBehavior != null) {
+ if(currentBehavior.getDelegate() != null) {
try {
currentBehavior.close();
} catch (Exception e) {
@Override
public void handleRecover(Object message) {
- if(persistence().isRecoveryApplicable()) {
- if (message instanceof SnapshotOffer) {
- onRecoveredSnapshot((SnapshotOffer) message);
- } else if (message instanceof ReplicatedLogEntry) {
- onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
- } else if (message instanceof ApplyLogEntries) {
- // Handle this message for backwards compatibility with pre-Lithium versions.
- onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
- } else if (message instanceof ApplyJournalEntries) {
- onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
- } else if (message instanceof DeleteEntries) {
- replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
- } else if (message instanceof UpdateElectionTerm) {
- context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
- ((UpdateElectionTerm) message).getVotedFor());
- } else if (message instanceof RecoveryCompleted) {
- onRecoveryCompletedMessage();
- }
- } else {
- if (message instanceof RecoveryCompleted) {
+ if(raftRecovery == null) {
+ raftRecovery = new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior,
+ getRaftActorRecoveryCohort());
+ }
+
+ boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message);
+ if(recoveryComplete) {
+ if(!persistence().isRecoveryApplicable()) {
// Delete all the messages from the akka journal so that we do not end up with consistency issues
// Note I am not using the dataPersistenceProvider and directly using the akka api here
deleteMessages(lastSequenceNr());
// Delete all the akka snapshots as they will not be needed
deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
-
- onRecoveryComplete();
-
- initializeBehavior();
}
- }
- }
-
- private void onRecoveredSnapshot(SnapshotOffer offer) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: SnapshotOffer called..", persistenceId());
- }
-
- initRecoveryTimer();
-
- Snapshot snapshot = (Snapshot) offer.snapshot();
- // Create a replicated log with the snapshot information
- // The replicated log can be used later on to retrieve this snapshot
- // when we need to install it on a peer
+ onRecoveryComplete();
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
- currentBehavior));
- context.setLastApplied(snapshot.getLastAppliedIndex());
- context.setCommitIndex(snapshot.getLastAppliedIndex());
+ initializeBehavior();
- Stopwatch timer = Stopwatch.createStarted();
-
- // Apply the snapshot to the actors state
- applyRecoverySnapshot(snapshot.getState());
-
- timer.stop();
- LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
- replicatedLog().size(), persistenceId(), timer.toString(),
- replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm());
- }
-
- private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
+ raftRecovery = null;
}
-
- replicatedLog().append(logEntry);
- }
-
- private void onRecoveredApplyLogEntries(long toIndex) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
- persistenceId(), context.getLastApplied() + 1, toIndex);
- }
-
- for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
- batchRecoveredLogEntry(replicatedLog().get(i));
- }
-
- context.setLastApplied(toIndex);
- context.setCommitIndex(toIndex);
- }
-
- private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
- initRecoveryTimer();
-
- int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
- if(currentRecoveryBatchCount == 0) {
- startLogRecoveryBatch(batchSize);
- }
-
- appendRecoveredLogEntry(logEntry.getData());
-
- if(++currentRecoveryBatchCount >= batchSize) {
- endCurrentLogRecoveryBatch();
- }
- }
-
- private void endCurrentLogRecoveryBatch() {
- applyCurrentLogRecoveryBatch();
- currentRecoveryBatchCount = 0;
- }
-
- private void onRecoveryCompletedMessage() {
- if(currentRecoveryBatchCount > 0) {
- endCurrentLogRecoveryBatch();
- }
-
- onRecoveryComplete();
-
- String recoveryTime = "";
- if(recoveryTimer != null) {
- recoveryTimer.stop();
- recoveryTime = " in " + recoveryTimer.toString();
- recoveryTimer = null;
- }
-
- LOG.info(
- "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
- "Persistence Id = " + persistenceId() +
- " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
- "journal-size={}",
- replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
- replicatedLog().getSnapshotTerm(), replicatedLog().size());
-
- initializeBehavior();
}
protected void initializeBehavior(){
handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
}
- @Override public void handleCommand(Object message) {
+ @Override
+ public void handleCommand(Object message) {
+ if(snapshotSupport == null) {
+ snapshotSupport = new RaftActorSnapshotMessageSupport(delegatingPersistenceProvider, context,
+ currentBehavior, getRaftActorSnapshotCohort(), self());
+ }
+
+ boolean handled = snapshotSupport.handleSnapshotMessage(message);
+ if(handled) {
+ return;
+ }
+
if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
persistence().persist(applyEntries, NoopProcedure.instance());
- } else if(message instanceof ApplySnapshot ) {
- Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: ApplySnapshot called on Follower Actor " +
- "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
- snapshot.getLastAppliedTerm()
- );
- }
-
- applySnapshot(snapshot.getState());
-
- //clears the followers log, sets the snapshot index to ensure adjusted-index works
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
- currentBehavior));
- context.setLastApplied(snapshot.getLastAppliedIndex());
-
} else if (message instanceof FindLeader) {
getSender().tell(
new FindLeaderReply(getLeaderAddress()),
getSelf()
);
-
- } else if (message instanceof SaveSnapshotSuccess) {
- SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
- LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
-
- long sequenceNumber = success.metadata().sequenceNr();
-
- commitSnapshot(sequenceNumber);
-
- } else if (message instanceof SaveSnapshotFailure) {
- SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
-
- LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
- persistenceId(), saveSnapshotFailure.cause());
-
- context.getSnapshotManager().rollback();
-
- } else if (message instanceof CaptureSnapshot) {
- LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
-
- context.getSnapshotManager().create(createSnapshotProcedure);
-
- } else if (message instanceof CaptureSnapshotReply) {
- handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
} else if(message instanceof GetOnDemandRaftState) {
onGetOnDemandRaftStats();
- } else if (message.equals(COMMIT_SNAPSHOT)) {
- commitSnapshot(-1);
} else {
reusableBehaviorStateHolder.init(getCurrentBehavior());
// Make saving Snapshot successful
// Committing the snapshot here would end up calling commit in the creating state which would
// be a state violation. That's why now we send a message to commit the snapshot.
- self().tell(COMMIT_SNAPSHOT, self());
+ self().tell(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT, self());
}
});
}
context.setPeerAddress(peerId, peerAddress);
}
- protected void commitSnapshot(long sequenceNumber) {
- context.getSnapshotManager().commit(persistence(), sequenceNumber);
- }
-
/**
* The applyState method will be called by the RaftActor when some data
* needs to be applied to the actor's state
Object data);
/**
- * This method is called during recovery at the start of a batch of state entries. Derived
- * classes should perform any initialization needed to start a batch.
+ * Returns the RaftActorRecoveryCohort to participate in persistence recovery.
*/
- protected abstract void startLogRecoveryBatch(int maxBatchSize);
-
- /**
- * This method is called during recovery to append state data to the current batch. This method
- * is called 1 or more times after {@link #startLogRecoveryBatch}.
- *
- * @param data the state data
- */
- protected abstract void appendRecoveredLogEntry(Payload data);
-
- /**
- * This method is called during recovery to reconstruct the state of the actor.
- *
- * @param snapshotBytes A snapshot of the state of the actor
- */
- protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
-
- /**
- * This method is called during recovery at the end of a batch to apply the current batched
- * log entries. This method is called after {@link #appendRecoveredLogEntry}.
- */
- protected abstract void applyCurrentLogRecoveryBatch();
+ @Nonnull
+ protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort();
/**
* This method is called when recovery is complete.
protected abstract void onRecoveryComplete();
/**
- * This method will be called by the RaftActor when a snapshot needs to be
- * created. The derived actor should respond with its current state.
- * <p/>
- * During recovery the state that is returned by the derived actor will
- * be passed back to it by calling the applySnapshot method
- *
- * @return The current state of the actor
+ * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
*/
- protected abstract void createSnapshot();
-
- /**
- * This method can be called at any other point during normal
- * operations when the derived actor is out of sync with it's peers
- * and the only way to bring it in sync is by applying a snapshot
- *
- * @param snapshotBytes A snapshot of the state of the actor
- */
- protected abstract void applySnapshot(byte[] snapshotBytes);
+ @Nonnull
+ protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
/**
* This method will be called by the RaftActor when the state of the
return peerAddress;
}
- private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
- LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
-
- context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, context.getTotalMemory());
- }
-
protected boolean hasFollowers(){
return getRaftActorContext().hasFollowers();
}
}
}
- private class CreateSnapshotProcedure implements Procedure<Void> {
-
- @Override
- public void apply(Void aVoid) throws Exception {
- createSnapshot();
- }
- }
-
private static class BehaviorStateHolder {
private RaftActorBehavior behavior;
private String leaderId;