Use SnapshotManager
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActor.java
index aa72485187cc9143fbcf6eac5f1adb0b7815b27e..19804111ec1fcf42dc241c3448e67d07c108eb62 100644 (file)
@@ -22,7 +22,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.List;
@@ -40,7 +39,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
-import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
@@ -104,6 +102,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 public void apply(ApplyJournalEntries param) throws Exception {
                 }
             };
+    private static final String COMMIT_SNAPSHOT = "commit_snapshot";
 
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
@@ -119,13 +118,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
      */
     private final RaftActorContextImpl context;
 
+    private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
+
     /**
      * The in-memory journal
      */
     private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
 
-    private CaptureSnapshot captureSnapshot = null;
-
     private Stopwatch recoveryTimer;
 
     private int currentRecoveryBatchCount;
@@ -379,26 +378,19 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
                     persistenceId(), saveSnapshotFailure.cause());
 
-            context.getReplicatedLog().snapshotRollback();
-
-            LOG.info("{}: Replicated Log rollbacked. Snapshot will be attempted in the next cycle." +
-                "snapshotIndex:{}, snapshotTerm:{}, log-size:{}", persistenceId(),
-                context.getReplicatedLog().getSnapshotIndex(),
-                context.getReplicatedLog().getSnapshotTerm(),
-                context.getReplicatedLog().size());
+            context.getSnapshotManager().rollback();
 
         } else if (message instanceof CaptureSnapshot) {
             LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
 
-            if(captureSnapshot == null) {
-                captureSnapshot = (CaptureSnapshot)message;
-                createSnapshot();
-            }
+            context.getSnapshotManager().create(createSnapshotProcedure);
 
-        } else if (message instanceof CaptureSnapshotReply){
+        } else if (message instanceof CaptureSnapshotReply) {
             handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
         } else if(message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
+        } else if (message.equals(COMMIT_SNAPSHOT)) {
+            commitSnapshot(-1);
         } else {
             reusableBehaviorStateHolder.init(currentBehavior);
 
@@ -416,7 +408,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 .currentTerm(context.getTermInformation().getCurrentTerm())
                 .inMemoryJournalDataSize(replicatedLog.dataSize())
                 .inMemoryJournalLogSize(replicatedLog.size())
-                .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated())
+                .isSnapshotCaptureInitiated(context.getSnapshotManager().isCapturing())
                 .lastApplied(context.getLastApplied())
                 .lastIndex(replicatedLog.lastIndex())
                 .lastTerm(replicatedLog.lastTerm())
@@ -515,15 +507,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             // the state to durable storage
                             self().tell(new ApplyJournalEntries(replicatedLogEntry.getIndex()), self());
 
-                            // Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
-                            if(!context.isSnapshotCaptureInitiated()){
-                                raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
-                                        raftContext.getTermInformation().getCurrentTerm());
-                                raftContext.getReplicatedLog().snapshotCommit();
-                            } else {
-                                LOG.debug("{}: Skipping fake snapshotting for {} because real snapshotting is in progress",
-                                        persistenceId(), getId());
-                            }
+                            context.getSnapshotManager().trimLog(context.getLastApplied(), currentBehavior);
+
                         } else if (clientActor != null) {
                             // Send message for replication
                             currentBehavior.handleMessage(getSelf(),
@@ -621,10 +606,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     protected void commitSnapshot(long sequenceNumber) {
-        context.getReplicatedLog().snapshotCommit();
-
-        // TODO: Not sure if we want to be this aggressive with trimming stuff
-        trimPersistentData(sequenceNumber);
+        context.getSnapshotManager().commit(persistence(), sequenceNumber);
     }
 
     /**
@@ -747,67 +729,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
         LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
 
-        // create a snapshot object from the state provided and save it
-        // when snapshot is saved async, SaveSnapshotSuccess is raised.
-
-        Snapshot sn = Snapshot.create(snapshotBytes,
-            context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
-            captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
-            captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
-
-        persistence().saveSnapshot(sn);
-
-        LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
-
-        long dataThreshold = getTotalMemory() *
-                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-        if (context.getReplicatedLog().dataSize() > dataThreshold) {
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: dataSize {} exceeds dataThreshold {} - doing snapshotPreCommit with index {}",
-                        persistenceId(), context.getReplicatedLog().dataSize(), dataThreshold,
-                        captureSnapshot.getLastAppliedIndex());
-            }
-
-            // if memory is less, clear the log based on lastApplied.
-            // this could/should only happen if one of the followers is down
-            // as normally we keep removing from the log when its replicated to all.
-            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
-                    captureSnapshot.getLastAppliedTerm());
-
-            // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
-            // install snapshot to a follower.
-            if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
-                getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
-            }
-        } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
-            // clear the log based on replicatedToAllIndex
-            context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
-                    captureSnapshot.getReplicatedToAllTerm());
-
-            getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
-        } else {
-            // The replicatedToAllIndex was not found in the log
-            // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
-            // In this scenario we may need to save the snapshot to the akka persistence
-            // snapshot for recovery but we do not need to do the replicated log trimming.
-            context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
-                    replicatedLog.getSnapshotTerm());
-        }
-
-
-        LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " +
-            "and term: {}", persistenceId(), replicatedLog.getSnapshotIndex(),
-            replicatedLog.getSnapshotTerm());
-
-        if (isLeader() && captureSnapshot.isInstallSnapshotInitiated()) {
-            // this would be call straight to the leader and won't initiate in serialization
-            currentBehavior.handleMessage(getSelf(), new SendInstallSnapshot(
-                    ByteString.copyFrom(snapshotBytes)));
-        }
-
-        captureSnapshot = null;
-        context.setSnapshotCaptureInitiated(false);
+        context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
     }
 
     protected long getTotalMemory() {
@@ -819,9 +741,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     }
 
     private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
-
         private static final int DATA_SIZE_DIVIDER = 5;
-        private long dataSizeSinceLastSnapshot = 0;
+        private long dataSizeSinceLastSnapshot = 0L;
+
 
         public ReplicatedLogImpl(Snapshot snapshot) {
             super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
@@ -887,9 +809,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                         long dataSizeForCheck = dataSize;
 
                         dataSizeSinceLastSnapshot += logEntrySize;
-                        long journalSize = lastIndex() + 1;
 
-                        if(!hasFollowers()) {
+                        if (!hasFollowers()) {
                             // When we do not have followers we do not maintain an in-memory log
                             // due to this the journalSize will never become anything close to the
                             // snapshot batch count. In fact will mostly be 1.
@@ -903,51 +824,22 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                             // as if we were maintaining a real snapshot
                             dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
                         }
-
+                        long journalSize = replicatedLogEntry.getIndex() + 1;
                         long dataThreshold = getTotalMemory() *
-                                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
-                        // when a snaphsot is being taken, captureSnapshot != null
-                        if (!context.isSnapshotCaptureInitiated() &&
-                                ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
-                                        dataSizeForCheck > dataThreshold)) {
-
-                            dataSizeSinceLastSnapshot = 0;
+                                context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
 
-                            LOG.info("{}: Initiating Snapshot Capture, journalSize = {}, dataSizeForCheck = {}," +
-                                " dataThreshold = {}", persistenceId(), journalSize, dataSizeForCheck, dataThreshold);
+                        if ((journalSize % context.getConfigParams().getSnapshotBatchCount() == 0
+                                || dataSizeForCheck > dataThreshold)) {
 
-                            long lastAppliedIndex = -1;
-                            long lastAppliedTerm = -1;
+                            boolean started = context.getSnapshotManager().capture(replicatedLogEntry,
+                                    currentBehavior.getReplicatedToAllIndex());
 
-                            ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
-                            if (!hasFollowers()) {
-                                lastAppliedIndex = replicatedLogEntry.getIndex();
-                                lastAppliedTerm = replicatedLogEntry.getTerm();
-                            } else if (lastAppliedEntry != null) {
-                                lastAppliedIndex = lastAppliedEntry.getIndex();
-                                lastAppliedTerm = lastAppliedEntry.getTerm();
+                            if(started){
+                                dataSizeSinceLastSnapshot = 0;
                             }
 
-                            if(LOG.isDebugEnabled()) {
-                                LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
-                                LOG.debug("{}: Snapshot Capture lastApplied:{} ",
-                                        persistenceId(), context.getLastApplied());
-                                LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
-                                        lastAppliedIndex);
-                                LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
-                                        lastAppliedTerm);
-                            }
-
-                            // send a CaptureSnapshot to self to make the expensive operation async.
-                            long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
-                            ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
-                            getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
-                                (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
-                                (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
-                                null);
-                            context.setSnapshotCaptureInitiated(true);
                         }
+
                         if (callback != null){
                             callback.apply(replicatedLogEntry);
                         }
@@ -1051,7 +943,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         @Override
         public void saveSnapshot(Object o) {
             // Make saving Snapshot successful
-            commitSnapshot(-1L);
+            // Committing the snapshot here would end up calling commit in the creating state which would
+            // be a state violation. That's why now we send a message to commit the snapshot.
+            self().tell(COMMIT_SNAPSHOT, self());
+        }
+    }
+
+
+    private class CreateSnapshotProcedure implements Procedure<Void> {
+
+        @Override
+        public void apply(Void aVoid) throws Exception {
+            createSnapshot();
         }
     }