Bug 6540: Refactor FollowerToSnapshot to its own class 13/45513/2
authorTom Pantelis <tpanteli@brocade.com>
Fri, 9 Sep 2016 18:10:47 +0000 (14:10 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Sat, 24 Sep 2016 12:07:19 +0000 (12:07 +0000)
Refactored FollowerToSnapshot to its own class and renamed to
LeaderInstallSnapshotState. This will facilitate subsequent patches.

Change-Id: Ie2540ddce1869a9972c8f3d547b0567c3d663aff
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java

index b241e0a..7c439f7 100644 (file)
@@ -16,7 +16,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -73,18 +72,8 @@ import scala.concurrent.duration.FiniteDuration;
  * set commitIndex = N (§5.3, §5.4).
  */
 public abstract class AbstractLeader extends AbstractRaftActorBehavior {
-
-    // The index of the first chunk that is sent when installing a snapshot
-    public static final int FIRST_CHUNK_INDEX = 1;
-
-    // The index that the follower should respond with if it needs the install snapshot to be reset
-    public static final int INVALID_CHUNK_INDEX = -1;
-
-    // This would be passed as the hash code of the last chunk when sending the first chunk
-    public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
-
     private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
-    private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
+    private final Map<String, LeaderInstallSnapshotState> mapFollowerToSnapshot = new HashMap<>();
 
     /**
      * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
@@ -448,7 +437,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
 
         String followerId = reply.getFollowerId();
-        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+        LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
 
         if (followerToSnapshot == null) {
             LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply",
@@ -526,7 +515,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     logName(), reply.getChunkIndex(), followerId,
                     followerToSnapshot.getChunkIndex());
 
-            if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
+            if(reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX){
                 // Since the Follower did not find this index to be valid we should reset the follower snapshot
                 // so that Installing the snapshot can resume from the beginning
                 followerToSnapshot.reset();
@@ -735,7 +724,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
                 // followerId to the followerToSnapshot map.
-                FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+                LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
 
                 int nextChunkIndex = followerToSnapshot.incrementChunkIndex();
                 Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
@@ -772,9 +761,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      * creates and return a ByteString chunk
      */
     private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
-        FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+        LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
         if (followerToSnapshot == null) {
-            followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
+            followerToSnapshot = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(),
+                    logName());
             mapFollowerToSnapshot.put(followerId, followerToSnapshot);
         }
         byte[] nextChunk = followerToSnapshot.getNextChunk();
@@ -845,120 +835,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return minPresent != 0;
     }
 
-    /**
-     * Encapsulates the snapshot bytestring and handles the logic of sending
-     * snapshot chunks
-     */
-    protected class FollowerToSnapshot {
-        private final ByteString snapshotBytes;
-        private int offset = 0;
-        // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
-        private int replyReceivedForOffset;
-        // if replyStatus is false, the previous chunk is attempted
-        private boolean replyStatus = false;
-        private int chunkIndex;
-        private final int totalChunks;
-        private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
-        private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
-
-        public FollowerToSnapshot(ByteString snapshotBytes) {
-            this.snapshotBytes = snapshotBytes;
-            int size = snapshotBytes.size();
-            totalChunks = (size / context.getConfigParams().getSnapshotChunkSize()) +
-                (size % context.getConfigParams().getSnapshotChunkSize() > 0 ? 1 : 0);
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
-                        logName(), size, totalChunks);
-            }
-            replyReceivedForOffset = -1;
-            chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
-        }
-
-        public ByteString getSnapshotBytes() {
-            return snapshotBytes;
-        }
-
-        public int incrementOffset() {
-            if(replyStatus) {
-                // if prev chunk failed, we would want to sent the same chunk again
-                offset = offset + context.getConfigParams().getSnapshotChunkSize();
-            }
-            return offset;
-        }
-
-        public int incrementChunkIndex() {
-            if (replyStatus) {
-                // if prev chunk failed, we would want to sent the same chunk again
-                chunkIndex =  chunkIndex + 1;
-            }
-            return chunkIndex;
-        }
-
-        public int getChunkIndex() {
-            return chunkIndex;
-        }
-
-        public int getTotalChunks() {
-            return totalChunks;
-        }
-
-        public boolean canSendNextChunk() {
-            // we only send a false if a chunk is sent but we have not received a reply yet
-            return replyReceivedForOffset == offset;
-        }
-
-        public boolean isLastChunk(int chunkIndex) {
-            return totalChunks == chunkIndex;
-        }
-
-        public void markSendStatus(boolean success) {
-            if (success) {
-                // if the chunk sent was successful
-                replyReceivedForOffset = offset;
-                replyStatus = true;
-                lastChunkHashCode = nextChunkHashCode;
-            } else {
-                // if the chunk sent was failure
-                replyReceivedForOffset = offset;
-                replyStatus = false;
-            }
-        }
-
-        public byte[] getNextChunk() {
-            int snapshotLength = getSnapshotBytes().size();
-            int start = incrementOffset();
-            int size = context.getConfigParams().getSnapshotChunkSize();
-            if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
-                size = snapshotLength;
-            } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
-                size = snapshotLength - start;
-            }
-
-            byte[] nextChunk = new byte[size];
-            getSnapshotBytes().copyTo(nextChunk, start, 0, size);
-            nextChunkHashCode = Arrays.hashCode(nextChunk);
-
-            LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(),
-                    snapshotLength, start, size, nextChunkHashCode);
-            return nextChunk;
-        }
-
-        /**
-         * reset should be called when the Follower needs to be sent the snapshot from the beginning
-         */
-        public void reset(){
-            offset = 0;
-            replyStatus = false;
-            replyReceivedForOffset = offset;
-            chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
-            lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
-        }
-
-        public int getLastChunkHashCode() {
-            return lastChunkHashCode;
-        }
-    }
-
     // called from example-actor for printing the follower-states
     public String printFollowerStates() {
         final StringBuilder sb = new StringBuilder();
@@ -982,7 +858,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     @VisibleForTesting
-    protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) {
+    protected void setFollowerSnapshot(String followerId, LeaderInstallSnapshotState snapshot) {
         mapFollowerToSnapshot.put(followerId, snapshot);
     }
 
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java
new file mode 100644 (file)
index 0000000..100c99b
--- /dev/null
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.behaviors;
+
+import com.google.protobuf.ByteString;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
+ */
+class LeaderInstallSnapshotState {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
+
+    // The index of the first chunk that is sent when installing a snapshot
+    static final int FIRST_CHUNK_INDEX = 1;
+
+    // The index that the follower should respond with if it needs the install snapshot to be reset
+    static final int INVALID_CHUNK_INDEX = -1;
+
+    // This would be passed as the hash code of the last chunk when sending the first chunk
+    static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
+
+    private int snapshotChunkSize;
+    private final ByteString snapshotBytes;
+    private final String logName;
+    private int offset = 0;
+    // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
+    private int replyReceivedForOffset;
+    // if replyStatus is false, the previous chunk is attempted
+    private boolean replyStatus = false;
+    private int chunkIndex;
+    private final int totalChunks;
+    private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
+    private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
+
+    public LeaderInstallSnapshotState(ByteString snapshotBytes, int snapshotChunkSize, String logName) {
+        this.snapshotChunkSize = snapshotChunkSize;
+        this.snapshotBytes = snapshotBytes;
+        this.logName = logName;
+        int size = snapshotBytes.size();
+        totalChunks = (size / snapshotChunkSize) +
+                (size % snapshotChunkSize > 0 ? 1 : 0);
+
+        LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, size, totalChunks);
+
+        replyReceivedForOffset = -1;
+        chunkIndex = FIRST_CHUNK_INDEX;
+    }
+
+    ByteString getSnapshotBytes() {
+        return snapshotBytes;
+    }
+
+    int incrementOffset() {
+        if(replyStatus) {
+            // if prev chunk failed, we would want to sent the same chunk again
+            offset = offset + snapshotChunkSize;
+        }
+        return offset;
+    }
+
+    int incrementChunkIndex() {
+        if (replyStatus) {
+            // if prev chunk failed, we would want to sent the same chunk again
+            chunkIndex =  chunkIndex + 1;
+        }
+        return chunkIndex;
+    }
+
+    int getChunkIndex() {
+        return chunkIndex;
+    }
+
+    int getTotalChunks() {
+        return totalChunks;
+    }
+
+    boolean canSendNextChunk() {
+        // we only send a false if a chunk is sent but we have not received a reply yet
+        return replyReceivedForOffset == offset;
+    }
+
+    boolean isLastChunk(int index) {
+        return totalChunks == index;
+    }
+
+    void markSendStatus(boolean success) {
+        if (success) {
+            // if the chunk sent was successful
+            replyReceivedForOffset = offset;
+            replyStatus = true;
+            lastChunkHashCode = nextChunkHashCode;
+        } else {
+            // if the chunk sent was failure
+            replyReceivedForOffset = offset;
+            replyStatus = false;
+        }
+    }
+
+    byte[] getNextChunk() {
+        int snapshotLength = getSnapshotBytes().size();
+        int start = incrementOffset();
+        int size = snapshotChunkSize;
+        if (snapshotChunkSize > snapshotLength) {
+            size = snapshotLength;
+        } else if ((start + snapshotChunkSize) > snapshotLength) {
+            size = snapshotLength - start;
+        }
+
+        byte[] nextChunk = new byte[size];
+        getSnapshotBytes().copyTo(nextChunk, start, 0, size);
+        nextChunkHashCode = Arrays.hashCode(nextChunk);
+
+        LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
+                snapshotLength, start, size, nextChunkHashCode);
+        return nextChunk;
+    }
+
+    /**
+     * reset should be called when the Follower needs to be sent the snapshot from the beginning
+     */
+    void reset(){
+        offset = 0;
+        replyStatus = false;
+        replyReceivedForOffset = offset;
+        chunkIndex = FIRST_CHUNK_INDEX;
+        lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
+    }
+
+    int getLastChunkHashCode() {
+        return lastChunkHashCode;
+    }
+}
index 9249142..fadca3b 100644 (file)
@@ -22,9 +22,9 @@ public class SnapshotTracker {
     private final int totalChunks;
     private final String leaderId;
     private ByteString collectedChunks = ByteString.EMPTY;
-    private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1;
+    private int lastChunkIndex = LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1;
     private boolean sealed = false;
-    private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
+    private int lastChunkHashCode = LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE;
 
     SnapshotTracker(Logger LOG, int totalChunks, String leaderId) {
         this.LOG = LOG;
index eb81e51..4127879 100644 (file)
@@ -51,7 +51,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
-import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
@@ -585,7 +584,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         ByteString bs = toByteString(leadersSnapshot);
         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
-        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+                actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
 
         //send first chunk and no InstallSnapshotReply received yet
@@ -918,7 +918,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         ByteString bs = toByteString(leadersSnapshot);
         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
-        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+                actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
         while(!fts.isLastChunk(fts.getChunkIndex())) {
             fts.getNextChunk();
@@ -1128,7 +1129,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
-        assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
+        assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
+                installSnapshot.getLastChunkHashCode().get().intValue());
 
         int hashCode = Arrays.hashCode(installSnapshot.getData());
 
@@ -1167,7 +1169,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         ByteString bs = toByteString(leadersSnapshot);
         byte[] barray = bs.toByteArray();
 
-        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+                actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
         leader.setFollowerSnapshot(FOLLOWER_ID, fts);
 
         assertEquals(bs.size(), barray.length);
index c1bc215..6816c95 100644 (file)
@@ -71,7 +71,7 @@ public class SnapshotTrackerTest {
         SnapshotTracker tracker3 = new SnapshotTracker(logger, 2, "leader");
 
         try {
-            tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
+            tracker3.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
             Assert.fail();
         } catch(SnapshotTracker.InvalidChunkException e){
 
@@ -80,10 +80,10 @@ public class SnapshotTrackerTest {
         // Out of sequence chunk indexes won't work
         SnapshotTracker tracker4 = new SnapshotTracker(logger, 2, "leader");
 
-        tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
+        tracker4.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
 
         try {
-            tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX+2, chunk2, Optional.<Integer>absent());
+            tracker4.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX+2, chunk2, Optional.<Integer>absent());
             Assert.fail();
         } catch(SnapshotTracker.InvalidChunkException e){
 
@@ -93,19 +93,19 @@ public class SnapshotTrackerTest {
         // If the lastChunkHashCode is missing
         SnapshotTracker tracker5 = new SnapshotTracker(logger, 2, "leader");
 
-        tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
+        tracker5.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
         // Look I can add the same chunk again
-        tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk1, Optional.<Integer>absent());
+        tracker5.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX + 1, chunk1, Optional.<Integer>absent());
 
         // An exception will be thrown when an invalid chunk is addedd with the right sequence
         // when the lastChunkHashCode is present
         SnapshotTracker tracker6 = new SnapshotTracker(logger, 2, "leader");
 
-        tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1));
+        tracker6.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1));
 
         try {
             // Here we add a second chunk and tell addChunk that the previous chunk had a hash code 777
-            tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777));
+            tracker6.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777));
             Assert.fail();
         }catch(SnapshotTracker.InvalidChunkException e){
 
@@ -129,7 +129,7 @@ public class SnapshotTrackerTest {
 
         SnapshotTracker tracker2 = new SnapshotTracker(logger, 3, "leader");
 
-        tracker2.addChunk(1, chunk1, Optional.of(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE));
+        tracker2.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE));
         tracker2.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
         tracker2.addChunk(3, chunk3, Optional.of(Arrays.hashCode(chunk2)));
 
@@ -144,7 +144,7 @@ public class SnapshotTrackerTest {
 
         ByteString chunks = ByteString.copyFrom(chunk1).concat(ByteString.copyFrom(chunk2));
 
-        tracker1.addChunk(1, chunk1, Optional.of(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE));
+        tracker1.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE));
         tracker1.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
 
         assertEquals(chunks, tracker1.getCollectedChunks());