Bug 2948: Recovered log entries not applied after prior snapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / SnapshotManager.java
index 8121f75191e624cfd8595cd727985c73e1f3d5e7..f4f936bf161b2260f9cae4f397ded3af706f2294 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.raft;
 import akka.japi.Procedure;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.protobuf.ByteString;
+import java.util.List;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -19,7 +20,6 @@ import org.slf4j.Logger;
 
 public class SnapshotManager implements SnapshotState {
 
-
     private final SnapshotState IDLE = new Idle();
     private final SnapshotState CAPTURING = new Capturing();
     private final SnapshotState PERSISTING = new Persisting();
@@ -35,6 +35,7 @@ public class SnapshotManager implements SnapshotState {
 
     private SnapshotState currentState = IDLE;
     private CaptureSnapshot captureSnapshot;
+    private long lastSequenceNumber = -1;
 
     public SnapshotManager(RaftActorContext context, Logger logger) {
         this.context = context;
@@ -184,19 +185,26 @@ public class SnapshotManager implements SnapshotState {
             long newReplicatedToAllTerm = replicatedToAllTermInfoReader.getTerm();
 
             // send a CaptureSnapshot to self to make the expensive operation async.
+
+            List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
+
             captureSnapshot = new CaptureSnapshot(lastLogEntry.getIndex(),
                     lastLogEntry.getTerm(), lastAppliedIndex, lastAppliedTerm,
-                    newReplicatedToAllIndex, newReplicatedToAllTerm, targetFollower!=null);
+                    newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, targetFollower != null);
 
             SnapshotManager.this.currentState = CAPTURING;
 
-            if(targetFollower != null){
-                LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
-            } else {
+            if(captureSnapshot.isInstallSnapshotInitiated()) {
                 LOG.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
+            } else {
+                LOG.info("{}: Initiating snapshot capture {}", persistenceId(), captureSnapshot);
             }
 
+            lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
+
+            LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
+
             context.getActor().tell(captureSnapshot, context.getActor());
 
             return true;
@@ -261,7 +269,7 @@ public class SnapshotManager implements SnapshotState {
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
             Snapshot sn = Snapshot.create(snapshotBytes,
-                    context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
+                    captureSnapshot.getUnAppliedEntries(),
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
 
@@ -336,8 +344,9 @@ public class SnapshotManager implements SnapshotState {
             persistenceProvider.deleteSnapshots(new SnapshotSelectionCriteria(
                     sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
 
-            persistenceProvider.deleteMessages(sequenceNumber);
+            persistenceProvider.deleteMessages(lastSequenceNumber);
 
+            lastSequenceNumber = -1;
             SnapshotManager.this.currentState = IDLE;
         }