Refactor snapshot message processing to RaftActorSnapshotMessageSupport
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index 1f1521d797d45790dc35c747ca9f8af2d4c6dd8d..41a807aa355d394f659f488209f65e732646b120 100644 (file)
@@ -11,8 +11,6 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.japi.Procedure;
-import akka.persistence.SaveSnapshotFailure;
-import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
@@ -34,10 +32,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
@@ -96,8 +91,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
 
-    private static final String COMMIT_SNAPSHOT = "commit_snapshot";
-
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
     /**
@@ -114,10 +107,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
 
-    private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
-
     private RaftActorRecoverySupport raftRecovery;
 
+    private RaftActorSnapshotMessageSupport snapshotSupport;
+
     private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
@@ -145,7 +138,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     public void postStop() {
-        if(currentBehavior != null) {
+        if(currentBehavior.getDelegate() != null) {
             try {
                 currentBehavior.close();
             } catch (Exception e) {
@@ -192,7 +185,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
     }
 
-    @Override public void handleCommand(Object message) {
+    @Override
+    public void handleCommand(Object message) {
+        if(snapshotSupport == null) {
+            snapshotSupport = new RaftActorSnapshotMessageSupport(delegatingPersistenceProvider, context,
+                    currentBehavior, getRaftActorSnapshotCohort(), self());
+        }
+
+        boolean handled = snapshotSupport.handleSnapshotMessage(message);
+        if(handled) {
+            return;
+        }
+
         if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
 
@@ -219,56 +223,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
             persistence().persist(applyEntries, NoopProcedure.instance());
 
-        } else if(message instanceof ApplySnapshot ) {
-            Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: ApplySnapshot called on Follower Actor " +
-                        "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
-                    snapshot.getLastAppliedTerm()
-                );
-            }
-
-            applySnapshot(snapshot.getState());
-
-            //clears the followers log, sets the snapshot index to ensure adjusted-index works
-            context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
-                    currentBehavior));
-            context.setLastApplied(snapshot.getLastAppliedIndex());
-
         } else if (message instanceof FindLeader) {
             getSender().tell(
                 new FindLeaderReply(getLeaderAddress()),
                 getSelf()
             );
-
-        } else if (message instanceof SaveSnapshotSuccess) {
-            SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
-            LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
-
-            long sequenceNumber = success.metadata().sequenceNr();
-
-            commitSnapshot(sequenceNumber);
-
-        } else if (message instanceof SaveSnapshotFailure) {
-            SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
-
-            LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
-                    persistenceId(), saveSnapshotFailure.cause());
-
-            context.getSnapshotManager().rollback();
-
-        } else if (message instanceof CaptureSnapshot) {
-            LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
-
-            context.getSnapshotManager().create(createSnapshotProcedure);
-
-        } else if (message instanceof CaptureSnapshotReply) {
-            handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
         } else if(message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
-        } else if (message.equals(COMMIT_SNAPSHOT)) {
-            commitSnapshot(-1);
         } else {
             reusableBehaviorStateHolder.init(getCurrentBehavior());
 
@@ -504,7 +465,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     // Make saving Snapshot successful
                     // Committing the snapshot here would end up calling commit in the creating state which would
                     // be a state violation. That's why now we send a message to commit the snapshot.
-                    self().tell(COMMIT_SNAPSHOT, self());
+                    self().tell(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT, self());
                 }
             });
         }
@@ -528,10 +489,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context.setPeerAddress(peerId, peerAddress);
     }
 
-    protected void commitSnapshot(long sequenceNumber) {
-        context.getSnapshotManager().commit(persistence(), sequenceNumber);
-    }
-
     /**
      * The applyState method will be called by the RaftActor when some data
      * needs to be applied to the actor's state
@@ -564,24 +521,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected abstract void onRecoveryComplete();
 
     /**
-     * This method will be called by the RaftActor when a snapshot needs to be
-     * created. The derived actor should respond with its current state.
-     * <p/>
-     * During recovery the state that is returned by the derived actor will
-     * be passed back to it by calling the applySnapshot  method
-     *
-     * @return The current state of the actor
+     * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
      */
-    protected abstract void createSnapshot();
-
-    /**
-     * This method can be called at any other point during normal
-     * operations when the derived actor is out of sync with it's peers
-     * and the only way to bring it in sync is by applying a snapshot
-     *
-     * @param snapshotBytes A snapshot of the state of the actor
-     */
-    protected abstract void applySnapshot(byte[] snapshotBytes);
+    @Nonnull
+    protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -615,12 +558,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return peerAddress;
     }
 
-    private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
-        LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
-
-        context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, context.getTotalMemory());
-    }
-
     protected boolean hasFollowers(){
         return getRaftActorContext().hasFollowers();
     }
@@ -657,14 +594,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    private class CreateSnapshotProcedure implements Procedure<Void> {
-
-        @Override
-        public void apply(Void aVoid) throws Exception {
-            createSnapshot();
-        }
-    }
-
     private static class BehaviorStateHolder {
         private RaftActorBehavior behavior;
         private String leaderId;