Speedup AbstractLeader.printFollowerStates()
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index d85ac8ef67ded21d6140bb42512ba5fa6eb2165d..7d45da7a64b91629cc1d9f504de47ec3fe97a94f 100644 (file)
@@ -67,6 +67,16 @@ import scala.concurrent.duration.FiniteDuration;
  * set commitIndex = N (§5.3, §5.4).
  */
 public abstract class AbstractLeader extends AbstractRaftActorBehavior {
+
+    // The index of the first chunk that is sent when installing a snapshot
+    public static final int FIRST_CHUNK_INDEX = 1;
+
+    // The index that the follower should respond with if it needs the install snapshot to be reset
+    public static final int INVALID_CHUNK_INDEX = -1;
+
+    // This would be passed as the hash code of the last chunk when sending the first chunk
+    public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
+
     protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
     protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
 
@@ -332,6 +342,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         "sending snapshot chunk failed, Will retry, Chunk:{}",
                     reply.getChunkIndex()
                 );
+
                 followerToSnapshot.markSendStatus(false);
             }
 
@@ -341,6 +352,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     " or Chunk Index in InstallSnapshotReply not matching {} != {}",
                 followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
             );
+
+            if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
+                // Since the Follower did not find this index to be valid we should reset the follower snapshot
+                // so that Installing the snapshot can resume from the beginning
+                followerToSnapshot.reset();
+            }
         }
     }
 
@@ -539,7 +556,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         context.getReplicatedLog().getSnapshotTerm(),
                         getNextSnapshotChunk(followerId,snapshot.get()),
                         mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
-                        mapFollowerToSnapshot.get(followerId).getTotalChunks()
+                        mapFollowerToSnapshot.get(followerId).getTotalChunks(),
+                        Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
                     ).toSerializable(),
                     actor()
                 );
@@ -636,11 +654,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         private boolean replyStatus = false;
         private int chunkIndex;
         private int totalChunks;
+        private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+        private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
 
         public FollowerToSnapshot(ByteString snapshotBytes) {
             this.snapshotBytes = snapshotBytes;
-            replyReceivedForOffset = -1;
-            chunkIndex = 1;
             int size = snapshotBytes.size();
             totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
                 ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
@@ -648,6 +666,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 LOG.debug("Snapshot {} bytes, total chunks to send:{}",
                     size, totalChunks);
             }
+            replyReceivedForOffset = -1;
+            chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
         }
 
         public ByteString getSnapshotBytes() {
@@ -692,6 +712,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // if the chunk sent was successful
                 replyReceivedForOffset = offset;
                 replyStatus = true;
+                lastChunkHashCode = nextChunkHashCode;
             } else {
                 // if the chunk sent was failure
                 replyReceivedForOffset = offset;
@@ -715,20 +736,42 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 LOG.debug("length={}, offset={},size={}",
                     snapshotLength, start, size);
             }
-            return getSnapshotBytes().substring(start, start + size);
+            ByteString substring = getSnapshotBytes().substring(start, start + size);
+            nextChunkHashCode = substring.hashCode();
+            return substring;
+        }
 
+        /**
+         * reset should be called when the Follower needs to be sent the snapshot from the beginning
+         */
+        public void reset(){
+            offset = 0;
+            replyStatus = false;
+            replyReceivedForOffset = offset;
+            chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
+            lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+        }
+
+        public int getLastChunkHashCode() {
+            return lastChunkHashCode;
         }
     }
 
     // called from example-actor for printing the follower-states
     public String printFollowerStates() {
-        StringBuilder sb = new StringBuilder();
-        for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
-            boolean isFollowerActive = followerLogInformation.isFollowerActive();
-            sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+        final StringBuilder sb = new StringBuilder();
 
+        sb.append('[');
+        for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
+            sb.append('{');
+            sb.append(followerLogInformation.getId());
+            sb.append(" state:");
+            sb.append(followerLogInformation.isFollowerActive());
+            sb.append("},");
         }
-        return "[" + sb.toString() + "]";
+        sb.append(']');
+
+        return sb.toString();
     }
 
     @VisibleForTesting