Bug-2692:Real snapshots should use the replicatedToAllIndex for clearing in-mem log
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index 9e4f3b46c45b5de54458770b910b63f6c70305a8..94c38f6108eabf62ef9e41a3f484a6e2f915129d 100644 (file)
@@ -91,8 +91,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private Optional<ByteString> snapshot;
 
-    private long replicatedToAllIndex = -1;
-
     public AbstractLeader(RaftActorContext context) {
         super(context);
 
@@ -236,15 +234,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     private void purgeInMemoryLog() {
-        //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+        //find the lowest index across followers which has been replicated to all.
+        // lastApplied if there are no followers, so that we keep clearing the log for single-node
         // we would delete the in-mem log from that index on, in-order to minimize mem usage
         // we would also share this info thru AE with the followers so that they can delete their log entries as well.
-        long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+        long minReplicatedToAllIndex = followerToLog.isEmpty() ? context.getLastApplied() : Long.MAX_VALUE;
         for (FollowerLogInformation info : followerToLog.values()) {
             minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
         }
 
-        replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+        super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
     }
 
     @Override
@@ -472,7 +471,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
 
                 } else if (isFollowerActive && followerNextIndex >= 0 &&
-                    leaderLastIndex >= followerNextIndex) {
+                    leaderLastIndex > followerNextIndex && !context.isSnapshotCaptureInitiated()) {
                     // if the followers next index is not present in the leaders log, and
                     // if the follower is just not starting and if leader's index is more than followers index
                     // then snapshot should be sent
@@ -507,7 +506,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
             prevLogIndex(followerNextIndex),
             prevLogTerm(followerNextIndex), entries,
-            context.getCommitIndex(), replicatedToAllIndex);
+            context.getCommitIndex(), super.getReplicatedToAllIndex());
 
         if(!entries.isEmpty()) {
             LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
@@ -518,7 +517,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     /**
-     * /**
      * Install Snapshot works as follows
      * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
      * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
@@ -533,10 +531,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      * @param followerNextIndex
      */
     private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: initiateCaptureSnapshot, followers {}", context.getId(), followerToLog.keySet());
-        }
-
         if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
                 context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
 
@@ -557,15 +551,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 if (lastAppliedEntry != null) {
                     lastAppliedIndex = lastAppliedEntry.getIndex();
                     lastAppliedTerm = lastAppliedEntry.getTerm();
-                } else if (context.getReplicatedLog().getSnapshotIndex() > -1)  {
+                } else if (context.getReplicatedLog().getSnapshotIndex() > -1) {
                     lastAppliedIndex = context.getReplicatedLog().getSnapshotIndex();
                     lastAppliedTerm = context.getReplicatedLog().getSnapshotTerm();
                 }
 
                 boolean isInstallSnapshotInitiated = true;
-                actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
-                                lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
-                        actor());
+                long replicatedToAllIndex = super.getReplicatedToAllIndex();
+                ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
+                actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
+                    (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
+                    (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1),
+                    isInstallSnapshotInitiated), actor());
                 context.setSnapshotCaptureInitiated(true);
             }
         }
@@ -605,8 +602,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm(),
                         nextSnapshotChunk,
-                            followerToSnapshot.incrementChunkIndex(),
-                            followerToSnapshot.getTotalChunks(),
+                        followerToSnapshot.incrementChunkIndex(),
+                        followerToSnapshot.getTotalChunks(),
                         Optional.of(followerToSnapshot.getLastChunkHashCode())
                     ).toSerializable(),
                     actor()