Bug 6540: Refactor FollowerToSnapshot to its own class
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index b241e0a67a4f81118b1592e25bc1011e336318dc..7c439f7d5c929910c7dd04f4504deb7dc81c589f 100644 (file)
@@ -16,7 +16,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -73,18 +72,8 @@ import scala.concurrent.duration.FiniteDuration;
  * set commitIndex = N (§5.3, §5.4).
  */
 public abstract class AbstractLeader extends AbstractRaftActorBehavior {
-
-    // The index of the first chunk that is sent when installing a snapshot
-    public static final int FIRST_CHUNK_INDEX = 1;
-
-    // The index that the follower should respond with if it needs the install snapshot to be reset
-    public static final int INVALID_CHUNK_INDEX = -1;
-
-    // This would be passed as the hash code of the last chunk when sending the first chunk
-    public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
-
     private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
-    private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
+    private final Map<String, LeaderInstallSnapshotState> mapFollowerToSnapshot = new HashMap<>();
 
     /**
      * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
@@ -448,7 +437,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
 
         String followerId = reply.getFollowerId();
-        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+        LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
 
         if (followerToSnapshot == null) {
             LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply",
@@ -526,7 +515,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     logName(), reply.getChunkIndex(), followerId,
                     followerToSnapshot.getChunkIndex());
 
-            if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
+            if(reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX){
                 // Since the Follower did not find this index to be valid we should reset the follower snapshot
                 // so that Installing the snapshot can resume from the beginning
                 followerToSnapshot.reset();
@@ -735,7 +724,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
                 // followerId to the followerToSnapshot map.
-                FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+                LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
 
                 int nextChunkIndex = followerToSnapshot.incrementChunkIndex();
                 Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
@@ -772,9 +761,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      * creates and return a ByteString chunk
      */
     private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
-        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+        LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
         if (followerToSnapshot == null) {
-            followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
+            followerToSnapshot = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(),
+                    logName());
             mapFollowerToSnapshot.put(followerId, followerToSnapshot);
         }
         byte[] nextChunk = followerToSnapshot.getNextChunk();
@@ -845,120 +835,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return minPresent != 0;
     }
 
-    /**
-     * Encapsulates the snapshot bytestring and handles the logic of sending
-     * snapshot chunks
-     */
-    protected class FollowerToSnapshot {
-        private final ByteString snapshotBytes;
-        private int offset = 0;
-        // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
-        private int replyReceivedForOffset;
-        // if replyStatus is false, the previous chunk is attempted
-        private boolean replyStatus = false;
-        private int chunkIndex;
-        private final int totalChunks;
-        private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
-        private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
-
-        public FollowerToSnapshot(ByteString snapshotBytes) {
-            this.snapshotBytes = snapshotBytes;
-            int size = snapshotBytes.size();
-            totalChunks = (size / context.getConfigParams().getSnapshotChunkSize()) +
-                (size % context.getConfigParams().getSnapshotChunkSize() > 0 ? 1 : 0);
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
-                        logName(), size, totalChunks);
-            }
-            replyReceivedForOffset = -1;
-            chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
-        }
-
-        public ByteString getSnapshotBytes() {
-            return snapshotBytes;
-        }
-
-        public int incrementOffset() {
-            if(replyStatus) {
-                // if prev chunk failed, we would want to sent the same chunk again
-                offset = offset + context.getConfigParams().getSnapshotChunkSize();
-            }
-            return offset;
-        }
-
-        public int incrementChunkIndex() {
-            if (replyStatus) {
-                // if prev chunk failed, we would want to sent the same chunk again
-                chunkIndex =  chunkIndex + 1;
-            }
-            return chunkIndex;
-        }
-
-        public int getChunkIndex() {
-            return chunkIndex;
-        }
-
-        public int getTotalChunks() {
-            return totalChunks;
-        }
-
-        public boolean canSendNextChunk() {
-            // we only send a false if a chunk is sent but we have not received a reply yet
-            return replyReceivedForOffset == offset;
-        }
-
-        public boolean isLastChunk(int chunkIndex) {
-            return totalChunks == chunkIndex;
-        }
-
-        public void markSendStatus(boolean success) {
-            if (success) {
-                // if the chunk sent was successful
-                replyReceivedForOffset = offset;
-                replyStatus = true;
-                lastChunkHashCode = nextChunkHashCode;
-            } else {
-                // if the chunk sent was failure
-                replyReceivedForOffset = offset;
-                replyStatus = false;
-            }
-        }
-
-        public byte[] getNextChunk() {
-            int snapshotLength = getSnapshotBytes().size();
-            int start = incrementOffset();
-            int size = context.getConfigParams().getSnapshotChunkSize();
-            if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
-                size = snapshotLength;
-            } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
-                size = snapshotLength - start;
-            }
-
-            byte[] nextChunk = new byte[size];
-            getSnapshotBytes().copyTo(nextChunk, start, 0, size);
-            nextChunkHashCode = Arrays.hashCode(nextChunk);
-
-            LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(),
-                    snapshotLength, start, size, nextChunkHashCode);
-            return nextChunk;
-        }
-
-        /**
-         * reset should be called when the Follower needs to be sent the snapshot from the beginning
-         */
-        public void reset(){
-            offset = 0;
-            replyStatus = false;
-            replyReceivedForOffset = offset;
-            chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
-            lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
-        }
-
-        public int getLastChunkHashCode() {
-            return lastChunkHashCode;
-        }
-    }
-
     // called from example-actor for printing the follower-states
     public String printFollowerStates() {
         final StringBuilder sb = new StringBuilder();
@@ -982,7 +858,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     @VisibleForTesting
-    protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) {
+    protected void setFollowerSnapshot(String followerId, LeaderInstallSnapshotState snapshot) {
         mapFollowerToSnapshot.put(followerId, snapshot);
     }