Merge "Add missing copyright text"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / SnapshotManager.java
index f4f936bf161b2260f9cae4f397ded3af706f2294..9a916625c9331413685d6263bfe053930b6795bf 100644 (file)
@@ -10,9 +10,9 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import java.util.List;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
@@ -21,7 +21,6 @@ import org.slf4j.Logger;
 public class SnapshotManager implements SnapshotState {
 
     private final SnapshotState IDLE = new Idle();
-    private final SnapshotState CAPTURING = new Capturing();
     private final SnapshotState PERSISTING = new Persisting();
     private final SnapshotState CREATING = new Creating();
 
@@ -37,6 +36,8 @@ public class SnapshotManager implements SnapshotState {
     private CaptureSnapshot captureSnapshot;
     private long lastSequenceNumber = -1;
 
+    private Procedure<Void> createSnapshotProcedure;
+
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
         this.LOG = logger;
@@ -58,19 +59,13 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void create(Procedure<Void> callback) {
-        currentState.create(callback);
+    public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
+        currentState.persist(snapshotBytes, currentBehavior, totalMemory);
     }
 
     @Override
-    public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
-                        RaftActorBehavior currentBehavior, long totalMemory) {
-        currentState.persist(persistenceProvider, snapshotBytes, currentBehavior, totalMemory);
-    }
-
-    @Override
-    public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
-        currentState.commit(persistenceProvider, sequenceNumber);
+    public void commit(long sequenceNumber) {
+        currentState.commit(sequenceNumber);
     }
 
     @Override
@@ -83,6 +78,15 @@ public class SnapshotManager implements SnapshotState {
         return currentState.trimLog(desiredTrimIndex, currentBehavior);
     }
 
+    public void setCreateSnapshotCallable(Procedure<Void> createSnapshotProcedure) {
+        this.createSnapshotProcedure = createSnapshotProcedure;
+    }
+
+    @VisibleForTesting
+    public CaptureSnapshot getCaptureSnapshot() {
+        return captureSnapshot;
+    }
+
     private boolean hasFollowers(){
         return context.getPeerAddresses().keySet().size() > 0;
     }
@@ -111,18 +115,12 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void create(Procedure<Void> callback) {
-            LOG.debug("create should not be called in state {}", this);
-        }
-
-        @Override
-        public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
-                            RaftActorBehavior currentBehavior, long totalMemory) {
+        public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
             LOG.debug("persist should not be called in state {}", this);
         }
 
         @Override
-        public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+        public void commit(long sequenceNumber) {
             LOG.debug("commit should not be called in state {}", this);
         }
 
@@ -192,8 +190,6 @@ public class SnapshotManager implements SnapshotState {
                     lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
                     newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
 
-            SnapshotManager.this.currentState = CAPTURING;
-
             if(captureSnapshot.isInstallSnapshotInitiated()) {
                 LOG.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
@@ -205,7 +201,15 @@ public class SnapshotManager implements SnapshotState {
 
             LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
 
-            context.getActor().tell(captureSnapshot, context.getActor());
+            SnapshotManager.this.currentState = CREATING;
+
+            try {
+                createSnapshotProcedure.apply(null);
+            } catch (Exception e) {
+                SnapshotManager.this.currentState = IDLE;
+                LOG.error("Error creating snapshot", e);
+                return false;
+            }
 
             return true;
         }
@@ -231,30 +235,6 @@ public class SnapshotManager implements SnapshotState {
         }
     }
 
-    private class Capturing extends AbstractSnapshotState {
-
-        @Override
-        public boolean isCapturing() {
-            return true;
-        }
-
-        @Override
-        public void create(Procedure<Void> callback) {
-            try {
-                callback.apply(null);
-                SnapshotManager.this.currentState = CREATING;
-            } catch (Exception e) {
-                LOG.error("Unexpected error occurred", e);
-            }
-        }
-
-        @Override
-        public String toString() {
-            return "Capturing";
-        }
-
-    }
-
     private class Creating extends AbstractSnapshotState {
 
         @Override
@@ -263,8 +243,7 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void persist(DataPersistenceProvider persistenceProvider, byte[] snapshotBytes,
-                            RaftActorBehavior currentBehavior, long totalMemory) {
+        public void persist(byte[] snapshotBytes, RaftActorBehavior currentBehavior, long totalMemory) {
             // create a snapshot object from the state provided and save it
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
@@ -273,7 +252,7 @@ public class SnapshotManager implements SnapshotState {
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
 
-            persistenceProvider.saveSnapshot(sn);
+            context.getPersistenceProvider().saveSnapshot(sn);
 
             LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
 
@@ -339,12 +318,12 @@ public class SnapshotManager implements SnapshotState {
     private class Persisting extends AbstractSnapshotState {
 
         @Override
-        public void commit(DataPersistenceProvider persistenceProvider, long sequenceNumber) {
+        public void commit(long sequenceNumber) {
             context.getReplicatedLog().snapshotCommit();
-            persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
+            context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
                     sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
 
-            persistenceProvider.deleteMessages(lastSequenceNumber);
+            context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
 
             lastSequenceNumber = -1;
             SnapshotManager.this.currentState = IDLE;