Merge "Fix checkstyle warnings in netty-threadgroup-config."
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index d85ac8ef67ded21d6140bb42512ba5fa6eb2165d..e5c5dc752d3a257e9ce2f5852bf3350b006b8116 100644 (file)
@@ -67,6 +67,16 @@ import scala.concurrent.duration.FiniteDuration;
  * set commitIndex = N (§5.3, §5.4).
  */
 public abstract class AbstractLeader extends AbstractRaftActorBehavior {
+
+    // The index of the first chunk that is sent when installing a snapshot
+    public static final int FIRST_CHUNK_INDEX = 1;
+
+    // The index that the follower should respond with if it needs the install snapshot to be reset
+    public static final int INVALID_CHUNK_INDEX = -1;
+
+    // This would be passed as the hash code of the last chunk when sending the first chunk
+    public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
+
     protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
     protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
@@ -332,6 +342,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         "sending snapshot chunk failed, Will retry, Chunk:{}",
                     reply.getChunkIndex()
                 );
+
                 followerToSnapshot.markSendStatus(false);
             }
 
@@ -341,6 +352,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     " or Chunk Index in InstallSnapshotReply not matching {} != {}",
                 followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
             );
+
+            if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
+                // Since the Follower did not find this index to be valid we should reset the follower snapshot
+                // so that Installing the snapshot can resume from the beginning
+                followerToSnapshot.reset();
+            }
         }
     }
 
@@ -539,7 +556,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         context.getReplicatedLog().getSnapshotTerm(),
                         getNextSnapshotChunk(followerId,snapshot.get()),
                         mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
-                        mapFollowerToSnapshot.get(followerId).getTotalChunks()
+                        mapFollowerToSnapshot.get(followerId).getTotalChunks(),
+                        Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
                     ).toSerializable(),
                     actor()
                 );
@@ -636,11 +654,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         private boolean replyStatus = false;
         private int chunkIndex;
         private int totalChunks;
+        private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+        private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
 
         public FollowerToSnapshot(ByteString snapshotBytes) {
             this.snapshotBytes = snapshotBytes;
-            replyReceivedForOffset = -1;
-            chunkIndex = 1;
             int size = snapshotBytes.size();
             totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
@@ -648,6 +666,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 LOG.debug("Snapshot {} bytes, total chunks to send:{}",
                     size, totalChunks);
             }
+            replyReceivedForOffset = -1;
+            chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
         }
 
         public ByteString getSnapshotBytes() {
@@ -692,6 +712,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // if the chunk sent was successful
                 replyReceivedForOffset = offset;
                 replyStatus = true;
+                lastChunkHashCode = nextChunkHashCode;
             } else {
                 // if the chunk sent was failure
                 replyReceivedForOffset = offset;
@@ -715,8 +736,24 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 LOG.debug("length={}, offset={},size={}",
                     snapshotLength, start, size);
             }
-            return getSnapshotBytes().substring(start, start + size);
+            ByteString substring = getSnapshotBytes().substring(start, start + size);
+            nextChunkHashCode = substring.hashCode();
+            return substring;
+        }
+
+        /**
+         * reset should be called when the Follower needs to be sent the snapshot from the beginning
+         */
+        public void reset(){
+            offset = 0;
+            replyStatus = false;
+            replyReceivedForOffset = offset;
+            chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
+            lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+        }
 
+        public int getLastChunkHashCode() {
+            return lastChunkHashCode;
         }
     }