Bug 6540: Move LeaderInstallSnapshotState to FollowerLogInformation 14/45514/3
authorTom Pantelis <tpanteli@brocade.com>
Fri, 9 Sep 2016 21:36:03 +0000 (17:36 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Sun, 25 Sep 2016 19:24:10 +0000 (19:24 +0000)
AbstractLeader maintains a Map of followerId -> LeaderInstallSnapshotState
in parallel to the Map of followerId -> FollowerLogInformation. It makes
sense to move the LeaderInstallSnapshotState into the FollowerLogInformation
instead of maintaining 2 Maps.

Change-Id: Ia0b58fad9bb2fde42d8c1ba4b0f7aae4eb11abb5
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index 4367a7a..c0855c7 100644 (file)
@@ -7,6 +7,10 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
+
 /**
  * The state of the followers log as known by the Leader
  */
@@ -118,4 +122,24 @@ public interface FollowerLogInformation {
      * Sets the raft version of the follower.
      */
     void setRaftVersion(short payloadVersion);
+
+    /**
+     * Returns the LeaderInstallSnapshotState for the in progress install snapshot.
+     *
+     * @return the LeaderInstallSnapshotState if a snapshot install is in progress, null otherwise.
+     */
+    @Nullable
+    LeaderInstallSnapshotState getInstallSnapshotState();
+
+    /**
+     * Sets the LeaderInstallSnapshotState when an install snapshot is initiated.
+     *
+     * @param state the LeaderInstallSnapshotState
+     */
+    void setLeaderInstallSnapshotState(@Nonnull LeaderInstallSnapshotState state);
+
+    /**
+     * Clears the LeaderInstallSnapshotState when an install snapshot is complete.
+     */
+    void clearLeaderInstallSnapshotState();
 }
index 8988446..a8a33c3 100644 (file)
@@ -11,6 +11,9 @@ package org.opendaylight.controller.cluster.raft;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState;
 
 public class FollowerLogInformationImpl implements FollowerLogInformation {
     private final Stopwatch stopwatch = Stopwatch.createUnstarted();
@@ -35,6 +38,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     private final PeerInfo peerInfo;
 
+    private LeaderInstallSnapshotState installSnapshotState;
+
     public FollowerLogInformationImpl(PeerInfo peerInfo, long matchIndex, RaftActorContext context) {
         this.nextIndex = context.getCommitIndex();
         this.matchIndex = matchIndex;
@@ -169,6 +174,22 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
         this.raftVersion = raftVersion;
     }
 
+    @Override
+    @Nullable
+    public LeaderInstallSnapshotState getInstallSnapshotState() {
+        return installSnapshotState;
+    }
+
+    @Override
+    public void setLeaderInstallSnapshotState(@Nonnull LeaderInstallSnapshotState state) {
+        this.installSnapshotState = Preconditions.checkNotNull(state);
+    }
+
+    @Override
+    public void clearLeaderInstallSnapshotState() {
+        installSnapshotState = null;
+    }
+
     @Override
     public String toString() {
         return "FollowerLogInformationImpl [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex
index 7c439f7..5076a8a 100644 (file)
@@ -73,7 +73,6 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
-    private final Map<String, LeaderInstallSnapshotState> mapFollowerToSnapshot = new HashMap<>();
 
     /**
      * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
@@ -94,7 +93,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         if(initializeFromLeader != null) {
             followerToLog.putAll(initializeFromLeader.followerToLog);
-            mapFollowerToSnapshot.putAll(initializeFromLeader.mapFollowerToSnapshot);
             snapshot = initializeFromLeader.snapshot;
             trackers.addAll(initializeFromLeader.trackers);
         } else {
@@ -143,7 +141,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     public void removeFollower(String followerId) {
         followerToLog.remove(followerId);
-        mapFollowerToSnapshot.remove(followerId);
     }
 
     public void updateMinReplicaCount() {
@@ -177,6 +174,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
     }
 
+    @VisibleForTesting
+    boolean hasSnapshot() {
+        return snapshot.isPresent();
+    }
+
     @Override
     protected RaftActorBehavior handleAppendEntries(ActorRef sender,
         AppendEntries appendEntries) {
@@ -437,29 +439,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
 
         String followerId = reply.getFollowerId();
-        LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
-
-        if (followerToSnapshot == null) {
-            LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply",
-                    logName(), followerId);
-            return;
-        }
-
         FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
         if(followerLogInformation == null) {
             // This can happen during AddServer if it times out.
             LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply",
                     logName(), followerId);
-            mapFollowerToSnapshot.remove(followerId);
+            return;
+        }
+
+        LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
+        if (installSnapshotState == null) {
+            LOG.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply",
+                    logName(), followerId);
             return;
         }
 
         followerLogInformation.markFollowerActive();
 
-        if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+        if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) {
             boolean wasLastChunk = false;
             if (reply.isSuccess()) {
-                if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
+                if(installSnapshotState.isLastChunk(reply.getChunkIndex())) {
                     //this was the last chunk reply
                     if(LOG.isDebugEnabled()) {
                         LOG.debug("{}: InstallSnapshotReply received, " +
@@ -472,17 +472,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     long followerMatchIndex = snapshot.get().getLastIncludedIndex();
                     followerLogInformation.setMatchIndex(followerMatchIndex);
                     followerLogInformation.setNextIndex(followerMatchIndex + 1);
-                    mapFollowerToSnapshot.remove(followerId);
+                    followerLogInformation.clearLeaderInstallSnapshotState();
 
                     LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
                         logName(), followerId, followerLogInformation.getMatchIndex(),
                         followerLogInformation.getNextIndex());
 
-                    if (mapFollowerToSnapshot.isEmpty()) {
+                    if (!anyFollowersInstallingSnapshot()) {
                         // once there are no pending followers receiving snapshots
                         // we can remove snapshot from the memory
                         setSnapshot(null);
                     }
+
                     wasLastChunk = true;
                     if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){
                         UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess =
@@ -491,19 +492,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         LOG.debug("Sent message UnInitializedFollowerSnapshotReply to self");
                     }
                 } else {
-                    followerToSnapshot.markSendStatus(true);
+                    installSnapshotState.markSendStatus(true);
                 }
             } else {
                 LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
                         logName(), reply.getChunkIndex());
 
-                followerToSnapshot.markSendStatus(false);
+                installSnapshotState.markSendStatus(false);
             }
 
             if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
                 // Since the follower is now caught up try to purge the log.
                 purgeInMemoryLog();
-            } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
+            } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) {
                 ActorSelection followerActor = context.getPeerActorSelection(followerId);
                 if(followerActor != null) {
                     sendSnapshotChunk(followerActor, followerId);
@@ -513,16 +514,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         } else {
             LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
                     logName(), reply.getChunkIndex(), followerId,
-                    followerToSnapshot.getChunkIndex());
+                    installSnapshotState.getChunkIndex());
 
             if(reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX){
                 // Since the Follower did not find this index to be valid we should reset the follower snapshot
                 // so that Installing the snapshot can resume from the beginning
-                followerToSnapshot.reset();
+                installSnapshotState.reset();
             }
         }
     }
 
+    private boolean anyFollowersInstallingSnapshot() {
+        for(FollowerLogInformation info: followerToLog.values()) {
+            if(info.getInstallSnapshotState() != null) {
+                return true;
+            }
+
+        }
+
+        return false;
+    }
+
     private void replicate(Replicate replicate) {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
@@ -579,9 +591,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             boolean sendAppendEntries = false;
             List<ReplicatedLogEntry> entries = Collections.emptyList();
 
-            if (mapFollowerToSnapshot.get(followerId) != null) {
+            LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
+            if (installSnapshotState != null) {
                 // if install snapshot is in process , then sent next chunk if possible
-                if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+                if (isFollowerActive && installSnapshotState.canSendNextChunk()) {
                     sendSnapshotChunk(followerActor, followerId);
                 } else if(sendHeartbeat) {
                     // we send a heartbeat even if we have not received a reply for the last chunk
@@ -724,11 +737,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
                 // followerId to the followerToSnapshot map.
-                LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+                LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState();
 
-                int nextChunkIndex = followerToSnapshot.incrementChunkIndex();
+                int nextChunkIndex = installSnapshotState.incrementChunkIndex();
                 Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
-                if(followerToSnapshot.isLastChunk(nextChunkIndex)) {
+                if(installSnapshotState.isLastChunk(nextChunkIndex)) {
                     serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
                 }
 
@@ -738,8 +751,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         snapshot.get().getLastIncludedTerm(),
                         nextSnapshotChunk,
                         nextChunkIndex,
-                        followerToSnapshot.getTotalChunks(),
-                        Optional.of(followerToSnapshot.getLastChunkHashCode()),
+                        installSnapshotState.getTotalChunks(),
+                        Optional.of(installSnapshotState.getLastChunkHashCode()),
                         serverConfig
                     ).toSerializable(followerToLog.get(followerId).getRaftVersion()),
                     actor()
@@ -747,8 +760,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 if(LOG.isDebugEnabled()) {
                     LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
-                            logName(), followerActor.path(), followerToSnapshot.getChunkIndex(),
-                            followerToSnapshot.getTotalChunks());
+                            logName(), followerActor.path(), installSnapshotState.getChunkIndex(),
+                            installSnapshotState.getTotalChunks());
                 }
             }
         } catch (IOException e) {
@@ -761,13 +774,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      * creates and return a ByteString chunk
      */
     private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
-        LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
-        if (followerToSnapshot == null) {
-            followerToSnapshot = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(),
+        LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState();
+        if (installSnapshotState == null) {
+            installSnapshotState = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(),
                     logName());
-            mapFollowerToSnapshot.put(followerId, followerToSnapshot);
+            followerToLog.get(followerId).setLeaderInstallSnapshotState(installSnapshotState);
         }
-        byte[] nextChunk = followerToSnapshot.getNextChunk();
+        byte[] nextChunk = installSnapshotState.getNextChunk();
 
         LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length);
 
@@ -857,16 +870,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return followerToLog.get(followerId);
     }
 
-    @VisibleForTesting
-    protected void setFollowerSnapshot(String followerId, LeaderInstallSnapshotState snapshot) {
-        mapFollowerToSnapshot.put(followerId, snapshot);
-    }
-
-    @VisibleForTesting
-    public int followerSnapshotSize() {
-        return mapFollowerToSnapshot.size();
-    }
-
     @VisibleForTesting
     public int followerLogSize() {
         return followerToLog.size();
index 100c99b..81c3eff 100644 (file)
@@ -15,7 +15,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
  */
-class LeaderInstallSnapshotState {
+public final class LeaderInstallSnapshotState {
     private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
 
     // The index of the first chunk that is sent when installing a snapshot
@@ -40,12 +40,12 @@ class LeaderInstallSnapshotState {
     private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
     private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
 
-    public LeaderInstallSnapshotState(ByteString snapshotBytes, int snapshotChunkSize, String logName) {
+    LeaderInstallSnapshotState(ByteString snapshotBytes, int snapshotChunkSize, String logName) {
         this.snapshotChunkSize = snapshotChunkSize;
         this.snapshotBytes = snapshotBytes;
         this.logName = logName;
         int size = snapshotBytes.size();
-        totalChunks = (size / snapshotChunkSize) +
+        totalChunks = size / snapshotChunkSize +
                 (size % snapshotChunkSize > 0 ? 1 : 0);
 
         LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, size, totalChunks);
@@ -110,7 +110,7 @@ class LeaderInstallSnapshotState {
         int size = snapshotChunkSize;
         if (snapshotChunkSize > snapshotLength) {
             size = snapshotLength;
-        } else if ((start + snapshotChunkSize) > snapshotLength) {
+        } else if (start + snapshotChunkSize > snapshotLength) {
             size = snapshotLength - start;
         }
 
index 4127879..9e91e51 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
@@ -586,7 +587,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
-        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+        leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
 
         //send first chunk and no InstallSnapshotReply received yet
         fts.getNextChunk();
@@ -920,7 +921,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
-        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+        leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
         while(!fts.isLastChunk(fts.getChunkIndex())) {
             fts.getNextChunk();
             fts.incrementChunkIndex();
@@ -934,12 +935,13 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         assertTrue(raftBehavior instanceof Leader);
 
-        assertEquals(0, leader.followerSnapshotSize());
         assertEquals(1, leader.followerLogSize());
         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
         assertNotNull(fli);
+        assertNull(fli.getInstallSnapshotState());
         assertEquals(commitIndex, fli.getMatchIndex());
         assertEquals(commitIndex + 1, fli.getNextIndex());
+        assertFalse(leader.hasSnapshot());
     }
 
     @Test
@@ -1147,19 +1149,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testFollowerToSnapshotLogic() {
-        logStart("testFollowerToSnapshotLogic");
-
-        MockRaftActorContext actorContext = createActorContext();
-
-        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
-            @Override
-            public int getSnapshotChunkSize() {
-                return 50;
-            }
-        });
-
-        leader = new Leader(actorContext);
+    public void testLeaderInstallSnapshotState() {
+        logStart("testLeaderInstallSnapshotState");
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -1169,9 +1160,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         ByteString bs = toByteString(leadersSnapshot);
         byte[] barray = bs.toByteArray();
 
-        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
-                actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
-        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, 50, "test");
 
         assertEquals(bs.size(), barray.length);
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.