Bug-2692:Avoid fake snapshot during initiate snapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index da1627b98e7e4e8204385914795300ce073a71ad..3336cbd9bec1bbc9feaac98610dd8bea85a85a6f 100644 (file)
@@ -93,6 +93,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private Optional<ByteString> snapshot;
 
+    private long replicatedToAllIndex = -1;
+
     public AbstractLeader(RaftActorContext context) {
         super(context);
 
@@ -226,9 +228,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             applyLogToStateMachine(context.getCommitIndex());
         }
 
+        if (!context.isSnapshotCaptureInitiated()) {
+            purgeInMemoryLog();
+        }
+
         return this;
     }
 
+    private void purgeInMemoryLog() {
+        //find the lowest index across followers which has been replicated to all. -1 if there are no followers.
+        // we would delete the in-mem log from that index on, in-order to minimize mem usage
+        // we would also share this info thru AE with the followers so that they can delete their log entries as well.
+        long minReplicatedToAllIndex = followerToLog.isEmpty() ? -1 : Long.MAX_VALUE;
+        for (FollowerLogInformation info : followerToLog.values()) {
+            minReplicatedToAllIndex = Math.min(minReplicatedToAllIndex, info.getMatchIndex());
+        }
+
+        replicatedToAllIndex = fakeSnapshot(minReplicatedToAllIndex, replicatedToAllIndex);
+    }
+
     @Override
     protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
         final Iterator<ClientRequestTracker> it = trackerList.iterator();
@@ -276,6 +294,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (ยง5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(),
+                        rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
 
                 return switchBehavior(new Follower(context));
@@ -312,12 +333,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
         String followerId = reply.getFollowerId();
         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
+        if (followerToSnapshot == null) {
+            LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
+                    context.getId(), followerId);
+            return;
+        }
+
         FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
         followerLogInformation.markFollowerActive();
 
-        if (followerToSnapshot != null &&
-            followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
-
+        if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
             if (reply.isSuccess()) {
                 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
                     //this was the last chunk reply
@@ -355,12 +381,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 followerToSnapshot.markSendStatus(false);
             }
-
         } else {
-            LOG.error("{}: FollowerId in InstallSnapshotReply not known to Leader" +
-                    " or Chunk Index in InstallSnapshotReply not matching {} != {}",
-                    context.getId(), followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
-            );
+            LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
+                    context.getId(), reply.getChunkIndex(), followerId,
+                    followerToSnapshot.getChunkIndex());
 
             if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
                 // Since the Follower did not find this index to be valid we should reset the follower snapshot
@@ -395,6 +419,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
+
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final String followerId = e.getKey();
             ActorSelection followerActor = context.getPeerActorSelection(followerId);
@@ -404,14 +429,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 long followerNextIndex = followerLogInformation.getNextIndex();
                 boolean isFollowerActive = followerLogInformation.isFollowerActive();
 
-                if (mapFollowerToSnapshot.get(followerId) != null) {
+                FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+                if (followerToSnapshot != null) {
                     // if install snapshot is in process , then sent next chunk if possible
-                    if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+                    if (isFollowerActive && followerToSnapshot.canSendNextChunk()) {
                         sendSnapshotChunk(followerActor, followerId);
                     } else {
                         // we send a heartbeat even if we have not received a reply for the last chunk
                         sendAppendEntriesToFollower(followerActor, followerNextIndex,
-                            Collections.<ReplicatedLogEntry>emptyList());
+                            Collections.<ReplicatedLogEntry>emptyList(), followerId);
                     }
 
                 } else {
@@ -419,8 +445,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
                     final List<ReplicatedLogEntry> entries;
 
-                    if (isFollowerActive &&
-                        context.getReplicatedLog().isPresent(followerNextIndex)) {
+                    LOG.debug("{}: Checking sendAppendEntries for {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
+                            context.getId(), leaderLastIndex, leaderSnapShotIndex);
+
+                    if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
+                        LOG.debug("{}: sendAppendEntries: {} is present for {}", context.getId(),
+                                followerNextIndex, followerId);
+
                         // FIXME : Sending one entry at a time
                         entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
 
@@ -447,22 +478,25 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         entries =  Collections.<ReplicatedLogEntry>emptyList();
                     }
 
-                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
-
+                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
                 }
             }
         }
     }
 
     private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
-        List<ReplicatedLogEntry> entries) {
-        followerActor.tell(
-            new AppendEntries(currentTerm(), context.getId(),
-                prevLogIndex(followerNextIndex),
-                prevLogTerm(followerNextIndex), entries,
-                context.getCommitIndex()).toSerializable(),
-            actor()
-        );
+        List<ReplicatedLogEntry> entries, String followerId) {
+        AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
+            prevLogIndex(followerNextIndex),
+            prevLogTerm(followerNextIndex), entries,
+            context.getCommitIndex(), replicatedToAllIndex);
+
+        if(!entries.isEmpty()) {
+            LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
+                    appendEntries);
+        }
+
+        followerActor.tell(appendEntries.toSerializable(), actor());
     }
 
     /**
@@ -482,6 +516,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *
      */
     private void installSnapshotIfNeeded() {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet());
+        }
+
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
 
@@ -489,14 +527,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 long nextIndex = e.getValue().getNextIndex();
 
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
-                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                        context.getReplicatedLog().isInSnapshot(nextIndex)) {
                     LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
                     if (snapshot.isPresent()) {
                         // if a snapshot is present in the memory, most likely another install is in progress
                         // no need to capture snapshot
                         sendSnapshotChunk(followerActor, e.getKey());
 
-                    } else {
+                    } else if (!context.isSnapshotCaptureInitiated()) {
                         initiateCaptureSnapshot();
                         //we just need 1 follower who would need snapshot to be installed.
                         // when we have the snapshot captured, we would again check (in SendInstallSnapshot)
@@ -529,6 +567,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         actor().tell(new CaptureSnapshot(lastIndex(), lastTerm(),
                 lastAppliedIndex, lastAppliedTerm, isInstallSnapshotInitiated),
             actor());
+        context.setSnapshotCaptureInitiated(true);
     }
 
 
@@ -554,21 +593,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
         try {
             if (snapshot.isPresent()) {
+                ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get());
+
+                // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
+                // followerId to the followerToSnapshot map.
+                FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
                 followerActor.tell(
                     new InstallSnapshot(currentTerm(), context.getId(),
                         context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm(),
-                        getNextSnapshotChunk(followerId,snapshot.get()),
-                        mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
-                        mapFollowerToSnapshot.get(followerId).getTotalChunks(),
-                        Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
+                        nextSnapshotChunk,
+                            followerToSnapshot.incrementChunkIndex(),
+                            followerToSnapshot.getTotalChunks(),
+                        Optional.of(followerToSnapshot.getLastChunkHashCode())
                     ).toSerializable(),
                     actor()
                 );
                 LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
                         context.getId(), followerActor.path(),
-                        mapFollowerToSnapshot.get(followerId).getChunkIndex(),
-                        mapFollowerToSnapshot.get(followerId).getTotalChunks());
+                        followerToSnapshot.getChunkIndex(),
+                        followerToSnapshot.getTotalChunks());
             }
         } catch (IOException e) {
             LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());