X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=2459c2ff8b1764d3cd3b56be90fc7ea5191d65b6;hb=refs%2Fchanges%2F52%2F12352%2F5;hp=64fa7496042466e58bd51cf0a488c265898866da;hpb=05a8052a457b2e53f06233f1a0b056d162118566;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 64fa749604..2459c2ff8b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -18,10 +18,11 @@ import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; -import akka.persistence.UntypedPersistentActor; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.protobuf.ByteString; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; @@ -29,9 +30,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; -import org.opendaylight.controller.cluster.raft.behaviors.Candidate; import org.opendaylight.controller.cluster.raft.behaviors.Follower; -import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; @@ -40,6 +39,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; + import java.io.Serializable; import java.util.Map; @@ -83,7 +83,7 @@ import java.util.Map; *
  • when a snapshot should be saved
  • * */ -public abstract class RaftActor extends UntypedPersistentActor { +public abstract class RaftActor extends AbstractUntypedPersistentActor { protected final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this); @@ -137,29 +137,47 @@ public abstract class RaftActor extends UntypedPersistentActor { public void preStart() throws Exception { LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(), context.getConfigParams().getJournalRecoveryLogBatchSize()); + super.preStart(); } @Override - public void onReceiveRecover(Object message) { - if (message instanceof SnapshotOffer) { - onRecoveredSnapshot((SnapshotOffer)message); - } else if (message instanceof ReplicatedLogEntry) { - onRecoveredJournalLogEntry((ReplicatedLogEntry)message); - } else if (message instanceof ApplyLogEntries) { - onRecoveredApplyLogEntries((ApplyLogEntries)message); - } else if (message instanceof DeleteEntries) { - replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); - } else if (message instanceof UpdateElectionTerm) { - context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), - ((UpdateElectionTerm) message).getVotedFor()); - } else if (message instanceof RecoveryCompleted) { - onRecoveryCompletedMessage(); + public void handleRecover(Object message) { + if(persistence().isRecoveryApplicable()) { + if (message instanceof SnapshotOffer) { + onRecoveredSnapshot((SnapshotOffer) message); + } else if (message instanceof ReplicatedLogEntry) { + onRecoveredJournalLogEntry((ReplicatedLogEntry) message); + } else if (message instanceof ApplyLogEntries) { + onRecoveredApplyLogEntries((ApplyLogEntries) message); + } else if (message instanceof DeleteEntries) { + replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); + } else if (message instanceof UpdateElectionTerm) { + context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), + ((UpdateElectionTerm) message).getVotedFor()); + } else if (message instanceof RecoveryCompleted) { + onRecoveryCompletedMessage(); + } + } else { + if (message instanceof RecoveryCompleted) { + // Delete all the messages from the akka journal so that we do not end up with consistency issues + // Note I am not using the dataPersistenceProvider and directly using the akka api here + deleteMessages(lastSequenceNr()); + + // Delete all the akka snapshots as they will not be needed + deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue())); + + onRecoveryComplete(); + currentBehavior = new Follower(context); + onStateChanged(); + } } } private void onRecoveredSnapshot(SnapshotOffer offer) { - LOG.debug("SnapshotOffer called.."); + if(LOG.isDebugEnabled()) { + LOG.debug("SnapshotOffer called.."); + } initRecoveryTimer(); @@ -250,11 +268,11 @@ public abstract class RaftActor extends UntypedPersistentActor { replicatedLog.lastIndex(), replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, replicatedLog.size()); - currentBehavior = switchBehavior(RaftState.Follower); + currentBehavior = new Follower(context); onStateChanged(); } - @Override public void onReceiveCommand(Object message) { + @Override public void handleCommand(Object message) { if (message instanceof ApplyState){ ApplyState applyState = (ApplyState) message; @@ -272,7 +290,7 @@ public abstract class RaftActor extends UntypedPersistentActor { if(LOG.isDebugEnabled()) { LOG.debug("Persisting ApplyLogEntries with index={}", ale.getToIndex()); } - persist(new ApplyLogEntries(ale.getToIndex()), new Procedure() { + persistence().persist(new ApplyLogEntries(ale.getToIndex()), new Procedure() { @Override public void apply(ApplyLogEntries param) throws Exception { } @@ -304,10 +322,9 @@ public abstract class RaftActor extends UntypedPersistentActor { SaveSnapshotSuccess success = (SaveSnapshotSuccess) message; LOG.info("SaveSnapshotSuccess received for snapshot"); - context.getReplicatedLog().snapshotCommit(); + long sequenceNumber = success.metadata().sequenceNr(); - // TODO: Not sure if we want to be this aggressive with trimming stuff - trimPersistentData(success.metadata().sequenceNr()); + commitSnapshot(sequenceNumber); } else if (message instanceof SaveSnapshotFailure) { SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message; @@ -355,14 +372,13 @@ public abstract class RaftActor extends UntypedPersistentActor { if (!(message instanceof AppendEntriesMessages.AppendEntries) && !(message instanceof AppendEntriesReply) && !(message instanceof SendHeartBeat)) { if(LOG.isDebugEnabled()) { - LOG.debug("onReceiveCommand: message:" + message.getClass()); + LOG.debug("onReceiveCommand: message: {}", message.getClass()); } } - RaftState state = - currentBehavior.handleMessage(getSender(), message); RaftActorBehavior oldBehavior = currentBehavior; - currentBehavior = switchBehavior(state); + currentBehavior = currentBehavior.handleMessage(getSender(), message); + if(oldBehavior != currentBehavior){ onStateChanged(); } @@ -486,7 +502,12 @@ public abstract class RaftActor extends UntypedPersistentActor { context.setPeerAddress(peerId, peerAddress); } + protected void commitSnapshot(long sequenceNumber) { + context.getReplicatedLog().snapshotCommit(); + // TODO: Not sure if we want to be this aggressive with trimming stuff + trimPersistentData(sequenceNumber); + } /** * The applyState method will be called by the RaftActor when some data @@ -516,7 +537,7 @@ public abstract class RaftActor extends UntypedPersistentActor { /** * This method is called during recovery to append state data to the current batch. This method - * is called 1 or more times after {@link #startRecoveryStateBatch}. + * is called 1 or more times after {@link #startLogRecoveryBatch}. * * @param data the state data */ @@ -531,7 +552,7 @@ public abstract class RaftActor extends UntypedPersistentActor { /** * This method is called during recovery at the end of a batch to apply the current batched - * log entries. This method is called after {@link #appendRecoveryLogEntry}. + * log entries. This method is called after {@link #appendRecoveredLogEntry}. */ protected abstract void applyCurrentLogRecoveryBatch(); @@ -567,49 +588,19 @@ public abstract class RaftActor extends UntypedPersistentActor { */ protected abstract void onStateChanged(); - protected void onLeaderChanged(String oldLeader, String newLeader){}; - - private RaftActorBehavior switchBehavior(RaftState state) { - if (currentBehavior != null) { - if (currentBehavior.state() == state) { - return currentBehavior; - } - LOG.info("Switching from state " + currentBehavior.state() + " to " - + state); - - try { - currentBehavior.close(); - } catch (Exception e) { - LOG.error(e, - "Failed to close behavior : " + currentBehavior.state()); - } + protected abstract DataPersistenceProvider persistence(); - } else { - LOG.info("Switching behavior to " + state); - } - RaftActorBehavior behavior = null; - if (state == RaftState.Candidate) { - behavior = new Candidate(context); - } else if (state == RaftState.Follower) { - behavior = new Follower(context); - } else { - behavior = new Leader(context); - } - - - - return behavior; - } + protected void onLeaderChanged(String oldLeader, String newLeader){}; private void trimPersistentData(long sequenceNumber) { // Trim akka snapshots // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied // For now guessing that it is ANDed. - deleteSnapshots(new SnapshotSelectionCriteria( + persistence().deleteSnapshots(new SnapshotSelectionCriteria( sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000)); // Trim akka journal - deleteMessages(sequenceNumber); + persistence().deleteMessages(sequenceNumber); } private String getLeaderAddress(){ @@ -622,8 +613,8 @@ public abstract class RaftActor extends UntypedPersistentActor { } String peerAddress = context.getPeerAddress(leaderId); if(LOG.isDebugEnabled()) { - LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = " - + peerAddress); + LOG.debug("getLeaderAddress leaderId = {} peerAddress = {}", + leaderId, peerAddress); } return peerAddress; @@ -638,7 +629,7 @@ public abstract class RaftActor extends UntypedPersistentActor { captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(), captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm()); - saveSnapshot(sn); + persistence().saveSnapshot(sn); LOG.info("Persisting of snapshot done:{}", sn.getLogMessage()); @@ -680,7 +671,7 @@ public abstract class RaftActor extends UntypedPersistentActor { // FIXME: Maybe this should be done after the command is saved journal.subList(adjustedIndex , journal.size()).clear(); - persist(new DeleteEntries(adjustedIndex), new Procedure(){ + persistence().persist(new DeleteEntries(adjustedIndex), new Procedure(){ @Override public void apply(DeleteEntries param) throws Exception { @@ -697,8 +688,11 @@ public abstract class RaftActor extends UntypedPersistentActor { public void appendAndPersist(final ActorRef clientActor, final String identifier, final ReplicatedLogEntry replicatedLogEntry) { - context.getLogger().debug( - "Append log entry and persist {} ", replicatedLogEntry); + + if(LOG.isDebugEnabled()) { + LOG.debug("Append log entry and persist {} ", replicatedLogEntry); + } + // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs journal.add(replicatedLogEntry); @@ -707,7 +701,7 @@ public abstract class RaftActor extends UntypedPersistentActor { // persist call and the execution(s) of the associated event // handler. This also holds for multiple persist calls in context // of a single command. - persist(replicatedLogEntry, + persistence().persist(replicatedLogEntry, new Procedure() { @Override public void apply(ReplicatedLogEntry evt) throws Exception { @@ -753,7 +747,7 @@ public abstract class RaftActor extends UntypedPersistentActor { } - private static class DeleteEntries implements Serializable { + static class DeleteEntries implements Serializable { private final int fromIndex; @@ -796,7 +790,7 @@ public abstract class RaftActor extends UntypedPersistentActor { public void updateAndPersist(long currentTerm, String votedFor){ update(currentTerm, votedFor); // FIXME : Maybe first persist then update the state - persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure(){ + persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure(){ @Override public void apply(UpdateElectionTerm param) throws Exception { @@ -806,7 +800,7 @@ public abstract class RaftActor extends UntypedPersistentActor { } } - private static class UpdateElectionTerm implements Serializable { + static class UpdateElectionTerm implements Serializable { private final long currentTerm; private final String votedFor; @@ -824,4 +818,29 @@ public abstract class RaftActor extends UntypedPersistentActor { } } + protected class NonPersistentRaftDataProvider extends NonPersistentDataProvider { + + public NonPersistentRaftDataProvider(){ + + } + + /** + * The way snapshotting works is, + *
      + *
    1. RaftActor calls createSnapshot on the Shard + *
    2. Shard sends a CaptureSnapshotReply and RaftActor then calls saveSnapshot + *
    3. When saveSnapshot is invoked on the akka-persistence API it uses the SnapshotStore to save the snapshot. + * The SnapshotStore sends SaveSnapshotSuccess or SaveSnapshotFailure. When the RaftActor gets SaveSnapshot + * success it commits the snapshot to the in-memory journal. This commitSnapshot is mimicking what is done + * in SaveSnapshotSuccess. + *
    + * @param o + */ + @Override + public void saveSnapshot(Object o) { + // Make saving Snapshot successful + commitSnapshot(-1L); + } + } + }