Fix delete snapshots criteria
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorSnapshotMessageSupport.java
index 8b6871174180d8d372bcc2eb075ebef045ba6c3d..5173dc89f7c6b114ab857eb1970a2793b3c2f2e8 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.cluster.raft;
 
 import akka.actor.ActorRef;
-import akka.japi.Procedure;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import com.google.common.annotations.VisibleForTesting;
@@ -29,25 +28,16 @@ import scala.concurrent.duration.Duration;
  * @author Thomas Pantelis
  */
 class RaftActorSnapshotMessageSupport {
-    static final String COMMIT_SNAPSHOT = "commit_snapshot";
-
-    private final RaftActorContext context;
-    private final RaftActorSnapshotCohort cohort;
-    private final Logger log;
-
-    private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
+    static final Object COMMIT_SNAPSHOT = new Object() {
         @Override
-        public void apply(Void notUsed) {
-            cohort.createSnapshot(context.getActor());
+        public String toString() {
+            return "commit_snapshot";
         }
     };
 
-    private final Procedure<byte[]> applySnapshotProcedure = new Procedure<byte[]>() {
-        @Override
-        public void apply(byte[] state) {
-            cohort.applySnapshot(state);
-        }
-    };
+    private final RaftActorContext context;
+    private final RaftActorSnapshotCohort cohort;
+    private final Logger log;
 
     private Duration snapshotReplyActorTimeout = Duration.create(30, TimeUnit.SECONDS);
 
@@ -56,8 +46,8 @@ class RaftActorSnapshotMessageSupport {
         this.cohort = cohort;
         this.log = context.getLogger();
 
-        context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure);
-        context.getSnapshotManager().setApplySnapshotProcedure(applySnapshotProcedure);
+        context.getSnapshotManager().setCreateSnapshotRunnable(() -> cohort.createSnapshot(context.getActor()));
+        context.getSnapshotManager().setApplySnapshotConsumer(cohort::applySnapshot);
     }
 
     boolean handleSnapshotMessage(Object message, ActorRef sender) {
@@ -69,8 +59,8 @@ class RaftActorSnapshotMessageSupport {
             onSaveSnapshotFailure((SaveSnapshotFailure) message);
         } else if (message instanceof CaptureSnapshotReply) {
             onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
-        } else if (message.equals(COMMIT_SNAPSHOT)) {
-            context.getSnapshotManager().commit(-1);
+        } else if (COMMIT_SNAPSHOT.equals(message)) {
+            context.getSnapshotManager().commit(-1, -1);
         } else if (message instanceof GetSnapshot) {
             onGetSnapshot(sender);
         } else {
@@ -94,16 +84,15 @@ class RaftActorSnapshotMessageSupport {
     }
 
     private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
-        log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
-
         long sequenceNumber = success.metadata().sequenceNr();
 
-        context.getSnapshotManager().commit(sequenceNumber);
+        log.info("{}: SaveSnapshotSuccess received for snapshot, sequenceNr: {}", context.getId(), sequenceNumber);
+
+        context.getSnapshotManager().commit(sequenceNumber, success.metadata().timestamp());
     }
 
     private void onApplySnapshot(ApplySnapshot message) {
-        log.info("{}: Applying snapshot on follower with snapshotIndex: {}, snapshotTerm: {}", context.getId(),
-                message.getSnapshot().getLastAppliedIndex(), message.getSnapshot().getLastAppliedTerm());
+        log.info("{}: Applying snapshot on follower:  {}", context.getId(), message.getSnapshot());
 
         context.getSnapshotManager().apply(message);
     }