X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=2459c2ff8b1764d3cd3b56be90fc7ea5191d65b6;hb=139937c2e646894af6a9b2b8a8a1047c6ef82485;hp=66a46ef3bde0ca8a8d1fa8fe1056ccb463a594d6;hpb=b5167b9bc04f2792b275cfe0eac78c0f5eb9442d;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 66a46ef3bd..2459c2ff8b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -18,10 +18,11 @@ import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; -import akka.persistence.UntypedPersistentActor; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.protobuf.ByteString; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -38,6 +39,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; + import java.io.Serializable; import java.util.Map; @@ -81,7 +83,7 @@ import java.util.Map; *
  • when a snapshot should be saved
  • * */ -public abstract class RaftActor extends UntypedPersistentActor { +public abstract class RaftActor extends AbstractUntypedPersistentActor { protected final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); @@ -135,24 +137,40 @@ public abstract class RaftActor extends UntypedPersistentActor { public void preStart() throws Exception { LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(), context.getConfigParams().getJournalRecoveryLogBatchSize()); + super.preStart(); } @Override - public void onReceiveRecover(Object message) { - if (message instanceof SnapshotOffer) { - onRecoveredSnapshot((SnapshotOffer)message); - } else if (message instanceof ReplicatedLogEntry) { - onRecoveredJournalLogEntry((ReplicatedLogEntry)message); - } else if (message instanceof ApplyLogEntries) { - onRecoveredApplyLogEntries((ApplyLogEntries)message); - } else if (message instanceof DeleteEntries) { - replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); - } else if (message instanceof UpdateElectionTerm) { - context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), - ((UpdateElectionTerm) message).getVotedFor()); - } else if (message instanceof RecoveryCompleted) { - onRecoveryCompletedMessage(); + public void handleRecover(Object message) { + if(persistence().isRecoveryApplicable()) { + if (message instanceof SnapshotOffer) { + onRecoveredSnapshot((SnapshotOffer) message); + } else if (message instanceof ReplicatedLogEntry) { + onRecoveredJournalLogEntry((ReplicatedLogEntry) message); + } else if (message instanceof ApplyLogEntries) { + onRecoveredApplyLogEntries((ApplyLogEntries) message); + } else if (message instanceof DeleteEntries) { + replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); + } else if (message instanceof UpdateElectionTerm) { + context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), + ((UpdateElectionTerm) message).getVotedFor()); + } else if (message instanceof RecoveryCompleted) { + onRecoveryCompletedMessage(); + } + } else { + if (message instanceof RecoveryCompleted) { + // Delete all the messages from the akka journal so that we do not end up with consistency issues + // Note I am not using the dataPersistenceProvider and directly using the akka api here + deleteMessages(lastSequenceNr()); + + // Delete all the akka snapshots as they will not be needed + deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue())); + + onRecoveryComplete(); + currentBehavior = new Follower(context); + onStateChanged(); + } } } @@ -254,7 +272,7 @@ public abstract class RaftActor extends UntypedPersistentActor { onStateChanged(); } - @Override public void onReceiveCommand(Object message) { + @Override public void handleCommand(Object message) { if (message instanceof ApplyState){ ApplyState applyState = (ApplyState) message; @@ -272,7 +290,7 @@ public abstract class RaftActor extends UntypedPersistentActor { if(LOG.isDebugEnabled()) { LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex()); } - persist(new ApplyLogEntries(ale.getToIndex()), new Procedure() { + persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure() { @Override public void apply(ApplyLogEntries param) throws Exception { } @@ -304,10 +322,9 @@ public abstract class RaftActor extends UntypedPersistentActor { SaveSnapshotSuccess success = (SaveSnapshotSuccess) message; LOG.info("SaveSnapshotSuccess received for snapshot"); - context.getReplicatedLog().snapshotCommit(); + long sequenceNumber = success.metadata().sequenceNr(); - // TODO: Not sure if we want to be this aggressive with trimming stuff - trimPersistentData(success.metadata().sequenceNr()); + commitSnapshot(sequenceNumber); } else if (message instanceof SaveSnapshotFailure) { SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message; @@ -485,7 +502,12 @@ public abstract class RaftActor extends UntypedPersistentActor { context.setPeerAddress(peerId, peerAddress); } + protected void commitSnapshot(long sequenceNumber) { + context.getReplicatedLog().snapshotCommit(); + // TODO: Not sure if we want to be this aggressive with trimming stuff + trimPersistentData(sequenceNumber); + } /** * The applyState method will be called by the RaftActor when some data @@ -515,7 +537,7 @@ public abstract class RaftActor extends UntypedPersistentActor { /** * This method is called during recovery to append state data to the current batch. This method - * is called 1 or more times after {@link #startRecoveryStateBatch}. + * is called 1 or more times after {@link #startLogRecoveryBatch}. * * @param data the state data */ @@ -530,7 +552,7 @@ public abstract class RaftActor extends UntypedPersistentActor { /** * This method is called during recovery at the end of a batch to apply the current batched - * log entries. This method is called after {@link #appendRecoveryLogEntry}. + * log entries. This method is called after {@link #appendRecoveredLogEntry}. */ protected abstract void applyCurrentLogRecoveryBatch(); @@ -566,17 +588,19 @@ public abstract class RaftActor extends UntypedPersistentActor { */ protected abstract void onStateChanged(); + protected abstract DataPersistenceProvider persistence(); + protected void onLeaderChanged(String oldLeader, String newLeader){}; private void trimPersistentData(long sequenceNumber) { // Trim akka snapshots // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied // For now guessing that it is ANDed. - deleteSnapshots(new SnapshotSelectionCriteria( + persistence().deleteSnapshots(new SnapshotSelectionCriteria( sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000)); // Trim akka journal - deleteMessages(sequenceNumber); + persistence().deleteMessages(sequenceNumber); } private String getLeaderAddress(){ @@ -605,7 +629,7 @@ public abstract class RaftActor extends UntypedPersistentActor { captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); - saveSnapshot(sn); + persistence().saveSnapshot(sn); LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); @@ -647,7 +671,7 @@ public abstract class RaftActor extends UntypedPersistentActor { // FIXME: Maybe this should be done after the command is saved journal.subList(adjustedIndex , journal.size()).clear(); - persist(new DeleteEntries(adjustedIndex), new Procedure(){ + persistence().persist(new DeleteEntries(adjustedIndex), new Procedure(){ @Override public void apply(DeleteEntries param) throws Exception { @@ -677,7 +701,7 @@ public abstract class RaftActor extends UntypedPersistentActor { // persist call and the execution(s) of the associated event // handler. This also holds for multiple persist calls in context // of a single command. - persist(replicatedLogEntry, + persistence().persist(replicatedLogEntry, new Procedure() { @Override public void apply(ReplicatedLogEntry evt) throws Exception { @@ -723,7 +747,7 @@ public abstract class RaftActor extends UntypedPersistentActor { } - private static class DeleteEntries implements Serializable { + static class DeleteEntries implements Serializable { private final int fromIndex; @@ -766,7 +790,7 @@ public abstract class RaftActor extends UntypedPersistentActor { public void updateAndPersist(long currentTerm, String votedFor){ update(currentTerm, votedFor); // FIXME : Maybe first persist then update the state - persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure(){ + persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure(){ @Override public void apply(UpdateElectionTerm param) throws Exception { @@ -776,7 +800,7 @@ public abstract class RaftActor extends UntypedPersistentActor { } } - private static class UpdateElectionTerm implements Serializable { + static class UpdateElectionTerm implements Serializable { private final long currentTerm; private final String votedFor; @@ -794,4 +818,29 @@ public abstract class RaftActor extends UntypedPersistentActor { } } + protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider { + + public NonPersistentRaftDataProvider(){ + + } + + /** + * The way snapshotting works is, + *
      + *
    1. RaftActor calls createSnapshot on the Shard + *
    2. Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot + *
    3. When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot. + * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot + * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done + * in SaveSnapshotSuccess. + *
    + * @param o + */ + @Override + public void saveSnapshot(Object o) { + // Make saving Snapshot successful + commitSnapshot(-1L); + } + } + }