From 95d3c7975a423951dcbdecfbfa4cb6b7a23591cc Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 9 Sep 2016 17:36:03 -0400 Subject: [PATCH] Bug 6540: Move LeaderInstallSnapshotState to FollowerLogInformation AbstractLeader maintains a Map of followerId -> LeaderInstallSnapshotState in parallel to the Map of followerId -> FollowerLogInformation. It makes sense to move the LeaderInstallSnapshotState into the FollowerLogInformation instead of maintaining 2 Maps. Change-Id: Ia0b58fad9bb2fde42d8c1ba4b0f7aae4eb11abb5 Signed-off-by: Tom Pantelis --- .../cluster/raft/FollowerLogInformation.java | 24 +++++ .../raft/FollowerLogInformationImpl.java | 21 +++++ .../raft/behaviors/AbstractLeader.java | 93 ++++++++++--------- .../behaviors/LeaderInstallSnapshotState.java | 8 +- .../cluster/raft/behaviors/LeaderTest.java | 27 ++---- 5 files changed, 105 insertions(+), 68 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index 4367a7a151..c0855c7f71 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -7,6 +7,10 @@ */ package org.opendaylight.controller.cluster.raft; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState; + /** * The state of the followers log as known by the Leader */ @@ -118,4 +122,24 @@ public interface FollowerLogInformation { * Sets the raft version of the follower. */ void setRaftVersion(short payloadVersion); + + /** + * Returns the LeaderInstallSnapshotState for the in progress install snapshot. + * + * @return the LeaderInstallSnapshotState if a snapshot install is in progress, null otherwise. + */ + @Nullable + LeaderInstallSnapshotState getInstallSnapshotState(); + + /** + * Sets the LeaderInstallSnapshotState when an install snapshot is initiated. + * + * @param state the LeaderInstallSnapshotState + */ + void setLeaderInstallSnapshotState(@Nonnull LeaderInstallSnapshotState state); + + /** + * Clears the LeaderInstallSnapshotState when an install snapshot is complete. + */ + void clearLeaderInstallSnapshotState(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 8988446212..a8a33c30b2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -11,6 +11,9 @@ package org.opendaylight.controller.cluster.raft; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.raft.behaviors.LeaderInstallSnapshotState; public class FollowerLogInformationImpl implements FollowerLogInformation { private final Stopwatch stopwatch = Stopwatch.createUnstarted(); @@ -35,6 +38,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { private final PeerInfo peerInfo; + private LeaderInstallSnapshotState installSnapshotState; + public FollowerLogInformationImpl(PeerInfo peerInfo, long matchIndex, RaftActorContext context) { this.nextIndex = context.getCommitIndex(); this.matchIndex = matchIndex; @@ -169,6 +174,22 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { this.raftVersion = raftVersion; } + @Override + @Nullable + public LeaderInstallSnapshotState getInstallSnapshotState() { + return installSnapshotState; + } + + @Override + public void setLeaderInstallSnapshotState(@Nonnull LeaderInstallSnapshotState state) { + this.installSnapshotState = Preconditions.checkNotNull(state); + } + + @Override + public void clearLeaderInstallSnapshotState() { + installSnapshotState = null; + } + @Override public String toString() { return "FollowerLogInformationImpl [id=" + getId() + ", nextIndex=" + nextIndex + ", matchIndex=" + matchIndex diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 7c439f7d5c..5076a8a38f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -73,7 +73,6 @@ import scala.concurrent.duration.FiniteDuration; */ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private final Map followerToLog = new HashMap<>(); - private final Map mapFollowerToSnapshot = new HashMap<>(); /** * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really @@ -94,7 +93,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if(initializeFromLeader != null) { followerToLog.putAll(initializeFromLeader.followerToLog); - mapFollowerToSnapshot.putAll(initializeFromLeader.mapFollowerToSnapshot); snapshot = initializeFromLeader.snapshot; trackers.addAll(initializeFromLeader.trackers); } else { @@ -143,7 +141,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { public void removeFollower(String followerId) { followerToLog.remove(followerId); - mapFollowerToSnapshot.remove(followerId); } public void updateMinReplicaCount() { @@ -177,6 +174,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } + @VisibleForTesting + boolean hasSnapshot() { + return snapshot.isPresent(); + } + @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) { @@ -437,29 +439,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply); String followerId = reply.getFollowerId(); - LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId); - - if (followerToSnapshot == null) { - LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply", - logName(), followerId); - return; - } - FollowerLogInformation followerLogInformation = followerToLog.get(followerId); if(followerLogInformation == null) { // This can happen during AddServer if it times out. LOG.error("{}: FollowerLogInformation not found for follower {} in InstallSnapshotReply", logName(), followerId); - mapFollowerToSnapshot.remove(followerId); + return; + } + + LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState(); + if (installSnapshotState == null) { + LOG.error("{}: LeaderInstallSnapshotState not found for follower {} in InstallSnapshotReply", + logName(), followerId); return; } followerLogInformation.markFollowerActive(); - if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { + if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) { boolean wasLastChunk = false; if (reply.isSuccess()) { - if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { + if(installSnapshotState.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply if(LOG.isDebugEnabled()) { LOG.debug("{}: InstallSnapshotReply received, " + @@ -472,17 +472,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long followerMatchIndex = snapshot.get().getLastIncludedIndex(); followerLogInformation.setMatchIndex(followerMatchIndex); followerLogInformation.setNextIndex(followerMatchIndex + 1); - mapFollowerToSnapshot.remove(followerId); + followerLogInformation.clearLeaderInstallSnapshotState(); LOG.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}", logName(), followerId, followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex()); - if (mapFollowerToSnapshot.isEmpty()) { + if (!anyFollowersInstallingSnapshot()) { // once there are no pending followers receiving snapshots // we can remove snapshot from the memory setSnapshot(null); } + wasLastChunk = true; if(context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED){ UnInitializedFollowerSnapshotReply unInitFollowerSnapshotSuccess = @@ -491,19 +492,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("Sent message UnInitializedFollowerSnapshotReply to self"); } } else { - followerToSnapshot.markSendStatus(true); + installSnapshotState.markSendStatus(true); } } else { LOG.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}", logName(), reply.getChunkIndex()); - followerToSnapshot.markSendStatus(false); + installSnapshotState.markSendStatus(false); } if (wasLastChunk && !context.getSnapshotManager().isCapturing()) { // Since the follower is now caught up try to purge the log. purgeInMemoryLog(); - } else if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) { + } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) { ActorSelection followerActor = context.getPeerActorSelection(followerId); if(followerActor != null) { sendSnapshotChunk(followerActor, followerId); @@ -513,16 +514,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } else { LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", logName(), reply.getChunkIndex(), followerId, - followerToSnapshot.getChunkIndex()); + installSnapshotState.getChunkIndex()); if(reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX){ // Since the Follower did not find this index to be valid we should reset the follower snapshot // so that Installing the snapshot can resume from the beginning - followerToSnapshot.reset(); + installSnapshotState.reset(); } } } + private boolean anyFollowersInstallingSnapshot() { + for(FollowerLogInformation info: followerToLog.values()) { + if(info.getInstallSnapshotState() != null) { + return true; + } + + } + + return false; + } + private void replicate(Replicate replicate) { long logIndex = replicate.getReplicatedLogEntry().getIndex(); @@ -579,9 +591,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { boolean sendAppendEntries = false; List entries = Collections.emptyList(); - if (mapFollowerToSnapshot.get(followerId) != null) { + LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState(); + if (installSnapshotState != null) { // if install snapshot is in process , then sent next chunk if possible - if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { + if (isFollowerActive && installSnapshotState.canSendNextChunk()) { sendSnapshotChunk(followerActor, followerId); } else if(sendHeartbeat) { // we send a heartbeat even if we have not received a reply for the last chunk @@ -724,11 +737,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Note: the previous call to getNextSnapshotChunk has the side-effect of adding // followerId to the followerToSnapshot map. - LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId); + LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState(); - int nextChunkIndex = followerToSnapshot.incrementChunkIndex(); + int nextChunkIndex = installSnapshotState.incrementChunkIndex(); Optional serverConfig = Optional.absent(); - if(followerToSnapshot.isLastChunk(nextChunkIndex)) { + if(installSnapshotState.isLastChunk(nextChunkIndex)) { serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); } @@ -738,8 +751,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { snapshot.get().getLastIncludedTerm(), nextSnapshotChunk, nextChunkIndex, - followerToSnapshot.getTotalChunks(), - Optional.of(followerToSnapshot.getLastChunkHashCode()), + installSnapshotState.getTotalChunks(), + Optional.of(installSnapshotState.getLastChunkHashCode()), serverConfig ).toSerializable(followerToLog.get(followerId).getRaftVersion()), actor() @@ -747,8 +760,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { if(LOG.isDebugEnabled()) { LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", - logName(), followerActor.path(), followerToSnapshot.getChunkIndex(), - followerToSnapshot.getTotalChunks()); + logName(), followerActor.path(), installSnapshotState.getChunkIndex(), + installSnapshotState.getTotalChunks()); } } } catch (IOException e) { @@ -761,13 +774,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * creates and return a ByteString chunk */ private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { - LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId); - if (followerToSnapshot == null) { - followerToSnapshot = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(), + LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState(); + if (installSnapshotState == null) { + installSnapshotState = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(), logName()); - mapFollowerToSnapshot.put(followerId, followerToSnapshot); + followerToLog.get(followerId).setLeaderInstallSnapshotState(installSnapshotState); } - byte[] nextChunk = followerToSnapshot.getNextChunk(); + byte[] nextChunk = installSnapshotState.getNextChunk(); LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length); @@ -857,16 +870,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return followerToLog.get(followerId); } - @VisibleForTesting - protected void setFollowerSnapshot(String followerId, LeaderInstallSnapshotState snapshot) { - mapFollowerToSnapshot.put(followerId, snapshot); - } - - @VisibleForTesting - public int followerSnapshotSize() { - return mapFollowerToSnapshot.size(); - } - @VisibleForTesting public int followerLogSize() { return followerToLog.size(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java index 100c99bb5e..81c3eff451 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java @@ -15,7 +15,7 @@ import org.slf4j.LoggerFactory; /** * Encapsulates the leader state and logic for sending snapshot chunks to a follower. */ -class LeaderInstallSnapshotState { +public final class LeaderInstallSnapshotState { private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class); // The index of the first chunk that is sent when installing a snapshot @@ -40,12 +40,12 @@ class LeaderInstallSnapshotState { private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; - public LeaderInstallSnapshotState(ByteString snapshotBytes, int snapshotChunkSize, String logName) { + LeaderInstallSnapshotState(ByteString snapshotBytes, int snapshotChunkSize, String logName) { this.snapshotChunkSize = snapshotChunkSize; this.snapshotBytes = snapshotBytes; this.logName = logName; int size = snapshotBytes.size(); - totalChunks = (size / snapshotChunkSize) + + totalChunks = size / snapshotChunkSize + (size % snapshotChunkSize > 0 ? 1 : 0); LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, size, totalChunks); @@ -110,7 +110,7 @@ class LeaderInstallSnapshotState { int size = snapshotChunkSize; if (snapshotChunkSize > snapshotLength) { size = snapshotLength; - } else if ((start + snapshotChunkSize) > snapshotLength) { + } else if (start + snapshotChunkSize > snapshotLength) { size = snapshotLength - start; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 41278799ed..9e91e514d7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; @@ -586,7 +587,7 @@ public class LeaderTest extends AbstractLeaderTest { commitIndex, snapshotTerm, commitIndex, snapshotTerm)); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); - leader.setFollowerSnapshot(FOLLOWER_ID, fts); + leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts); //send first chunk and no InstallSnapshotReply received yet fts.getNextChunk(); @@ -920,7 +921,7 @@ public class LeaderTest extends AbstractLeaderTest { commitIndex, snapshotTerm, commitIndex, snapshotTerm)); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); - leader.setFollowerSnapshot(FOLLOWER_ID, fts); + leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts); while(!fts.isLastChunk(fts.getChunkIndex())) { fts.getNextChunk(); fts.incrementChunkIndex(); @@ -934,12 +935,13 @@ public class LeaderTest extends AbstractLeaderTest { assertTrue(raftBehavior instanceof Leader); - assertEquals(0, leader.followerSnapshotSize()); assertEquals(1, leader.followerLogSize()); FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID); assertNotNull(fli); + assertNull(fli.getInstallSnapshotState()); assertEquals(commitIndex, fli.getMatchIndex()); assertEquals(commitIndex + 1, fli.getNextIndex()); + assertFalse(leader.hasSnapshot()); } @Test @@ -1147,19 +1149,8 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testFollowerToSnapshotLogic() { - logStart("testFollowerToSnapshotLogic"); - - MockRaftActorContext actorContext = createActorContext(); - - actorContext.setConfigParams(new DefaultConfigParamsImpl() { - @Override - public int getSnapshotChunkSize() { - return 50; - } - }); - - leader = new Leader(actorContext); + public void testLeaderInstallSnapshotState() { + logStart("testLeaderInstallSnapshotState"); Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); @@ -1169,9 +1160,7 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); byte[] barray = bs.toByteArray(); - LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, - actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); - leader.setFollowerSnapshot(FOLLOWER_ID, fts); + LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, 50, "test"); assertEquals(bs.size(), barray.length); -- 2.36.6