From: Tom Pantelis Date: Fri, 9 Sep 2016 18:10:47 +0000 (-0400) Subject: Bug 6540: Refactor FollowerToSnapshot to its own class X-Git-Tag: release/carbon~461 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=d3e310b940b60f6590f0e94a576aece95a055942;ds=sidebyside Bug 6540: Refactor FollowerToSnapshot to its own class Refactored FollowerToSnapshot to its own class and renamed to LeaderInstallSnapshotState. This will facilitate subsequent patches. Change-Id: Ie2540ddce1869a9972c8f3d547b0567c3d663aff Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index b241e0a67a..7c439f7d5c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -16,7 +16,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -73,18 +72,8 @@ import scala.concurrent.duration.FiniteDuration; * set commitIndex = N (§5.3, §5.4). */ public abstract class AbstractLeader extends AbstractRaftActorBehavior { - - // The index of the first chunk that is sent when installing a snapshot - public static final int FIRST_CHUNK_INDEX = 1; - - // The index that the follower should respond with if it needs the install snapshot to be reset - public static final int INVALID_CHUNK_INDEX = -1; - - // This would be passed as the hash code of the last chunk when sending the first chunk - public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; - private final Map followerToLog = new HashMap<>(); - private final Map mapFollowerToSnapshot = new HashMap<>(); + private final Map mapFollowerToSnapshot = new HashMap<>(); /** * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really @@ -448,7 +437,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply); String followerId = reply.getFollowerId(); - FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId); if (followerToSnapshot == null) { LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply", @@ -526,7 +515,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { logName(), reply.getChunkIndex(), followerId, followerToSnapshot.getChunkIndex()); - if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){ + if(reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX){ // Since the Follower did not find this index to be valid we should reset the follower snapshot // so that Installing the snapshot can resume from the beginning followerToSnapshot.reset(); @@ -735,7 +724,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Note: the previous call to getNextSnapshotChunk has the side-effect of adding // followerId to the followerToSnapshot map. - FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId); int nextChunkIndex = followerToSnapshot.incrementChunkIndex(); Optional serverConfig = Optional.absent(); @@ -772,9 +761,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * creates and return a ByteString chunk */ private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { - FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId); if (followerToSnapshot == null) { - followerToSnapshot = new FollowerToSnapshot(snapshotBytes); + followerToSnapshot = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(), + logName()); mapFollowerToSnapshot.put(followerId, followerToSnapshot); } byte[] nextChunk = followerToSnapshot.getNextChunk(); @@ -845,120 +835,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return minPresent != 0; } - /** - * Encapsulates the snapshot bytestring and handles the logic of sending - * snapshot chunks - */ - protected class FollowerToSnapshot { - private final ByteString snapshotBytes; - private int offset = 0; - // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset - private int replyReceivedForOffset; - // if replyStatus is false, the previous chunk is attempted - private boolean replyStatus = false; - private int chunkIndex; - private final int totalChunks; - private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; - private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; - - public FollowerToSnapshot(ByteString snapshotBytes) { - this.snapshotBytes = snapshotBytes; - int size = snapshotBytes.size(); - totalChunks = (size / context.getConfigParams().getSnapshotChunkSize()) + - (size % context.getConfigParams().getSnapshotChunkSize() > 0 ? 1 : 0); - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}", - logName(), size, totalChunks); - } - replyReceivedForOffset = -1; - chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; - } - - public ByteString getSnapshotBytes() { - return snapshotBytes; - } - - public int incrementOffset() { - if(replyStatus) { - // if prev chunk failed, we would want to sent the same chunk again - offset = offset + context.getConfigParams().getSnapshotChunkSize(); - } - return offset; - } - - public int incrementChunkIndex() { - if (replyStatus) { - // if prev chunk failed, we would want to sent the same chunk again - chunkIndex = chunkIndex + 1; - } - return chunkIndex; - } - - public int getChunkIndex() { - return chunkIndex; - } - - public int getTotalChunks() { - return totalChunks; - } - - public boolean canSendNextChunk() { - // we only send a false if a chunk is sent but we have not received a reply yet - return replyReceivedForOffset == offset; - } - - public boolean isLastChunk(int chunkIndex) { - return totalChunks == chunkIndex; - } - - public void markSendStatus(boolean success) { - if (success) { - // if the chunk sent was successful - replyReceivedForOffset = offset; - replyStatus = true; - lastChunkHashCode = nextChunkHashCode; - } else { - // if the chunk sent was failure - replyReceivedForOffset = offset; - replyStatus = false; - } - } - - public byte[] getNextChunk() { - int snapshotLength = getSnapshotBytes().size(); - int start = incrementOffset(); - int size = context.getConfigParams().getSnapshotChunkSize(); - if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) { - size = snapshotLength; - } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { - size = snapshotLength - start; - } - - byte[] nextChunk = new byte[size]; - getSnapshotBytes().copyTo(nextChunk, start, 0, size); - nextChunkHashCode = Arrays.hashCode(nextChunk); - - LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(), - snapshotLength, start, size, nextChunkHashCode); - return nextChunk; - } - - /** - * reset should be called when the Follower needs to be sent the snapshot from the beginning - */ - public void reset(){ - offset = 0; - replyStatus = false; - replyReceivedForOffset = offset; - chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX; - lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; - } - - public int getLastChunkHashCode() { - return lastChunkHashCode; - } - } - // called from example-actor for printing the follower-states public String printFollowerStates() { final StringBuilder sb = new StringBuilder(); @@ -982,7 +858,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } @VisibleForTesting - protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) { + protected void setFollowerSnapshot(String followerId, LeaderInstallSnapshotState snapshot) { mapFollowerToSnapshot.put(followerId, snapshot); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java new file mode 100644 index 0000000000..100c99bb5e --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.behaviors; + +import com.google.protobuf.ByteString; +import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates the leader state and logic for sending snapshot chunks to a follower. + */ +class LeaderInstallSnapshotState { + private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class); + + // The index of the first chunk that is sent when installing a snapshot + static final int FIRST_CHUNK_INDEX = 1; + + // The index that the follower should respond with if it needs the install snapshot to be reset + static final int INVALID_CHUNK_INDEX = -1; + + // This would be passed as the hash code of the last chunk when sending the first chunk + static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; + + private int snapshotChunkSize; + private final ByteString snapshotBytes; + private final String logName; + private int offset = 0; + // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset + private int replyReceivedForOffset; + // if replyStatus is false, the previous chunk is attempted + private boolean replyStatus = false; + private int chunkIndex; + private final int totalChunks; + private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; + private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; + + public LeaderInstallSnapshotState(ByteString snapshotBytes, int snapshotChunkSize, String logName) { + this.snapshotChunkSize = snapshotChunkSize; + this.snapshotBytes = snapshotBytes; + this.logName = logName; + int size = snapshotBytes.size(); + totalChunks = (size / snapshotChunkSize) + + (size % snapshotChunkSize > 0 ? 1 : 0); + + LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, size, totalChunks); + + replyReceivedForOffset = -1; + chunkIndex = FIRST_CHUNK_INDEX; + } + + ByteString getSnapshotBytes() { + return snapshotBytes; + } + + int incrementOffset() { + if(replyStatus) { + // if prev chunk failed, we would want to sent the same chunk again + offset = offset + snapshotChunkSize; + } + return offset; + } + + int incrementChunkIndex() { + if (replyStatus) { + // if prev chunk failed, we would want to sent the same chunk again + chunkIndex = chunkIndex + 1; + } + return chunkIndex; + } + + int getChunkIndex() { + return chunkIndex; + } + + int getTotalChunks() { + return totalChunks; + } + + boolean canSendNextChunk() { + // we only send a false if a chunk is sent but we have not received a reply yet + return replyReceivedForOffset == offset; + } + + boolean isLastChunk(int index) { + return totalChunks == index; + } + + void markSendStatus(boolean success) { + if (success) { + // if the chunk sent was successful + replyReceivedForOffset = offset; + replyStatus = true; + lastChunkHashCode = nextChunkHashCode; + } else { + // if the chunk sent was failure + replyReceivedForOffset = offset; + replyStatus = false; + } + } + + byte[] getNextChunk() { + int snapshotLength = getSnapshotBytes().size(); + int start = incrementOffset(); + int size = snapshotChunkSize; + if (snapshotChunkSize > snapshotLength) { + size = snapshotLength; + } else if ((start + snapshotChunkSize) > snapshotLength) { + size = snapshotLength - start; + } + + byte[] nextChunk = new byte[size]; + getSnapshotBytes().copyTo(nextChunk, start, 0, size); + nextChunkHashCode = Arrays.hashCode(nextChunk); + + LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName, + snapshotLength, start, size, nextChunkHashCode); + return nextChunk; + } + + /** + * reset should be called when the Follower needs to be sent the snapshot from the beginning + */ + void reset(){ + offset = 0; + replyStatus = false; + replyReceivedForOffset = offset; + chunkIndex = FIRST_CHUNK_INDEX; + lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; + } + + int getLastChunkHashCode() { + return lastChunkHashCode; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java index 9249142874..fadca3b15d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java @@ -22,9 +22,9 @@ public class SnapshotTracker { private final int totalChunks; private final String leaderId; private ByteString collectedChunks = ByteString.EMPTY; - private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1; + private int lastChunkIndex = LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1; private boolean sealed = false; - private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; + private int lastChunkHashCode = LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE; SnapshotTracker(Logger LOG, int totalChunks, String leaderId) { this.LOG = LOG; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index eb81e512f3..41278799ed 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -51,7 +51,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; -import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; @@ -585,7 +584,8 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm)); - FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); + LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, + actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); leader.setFollowerSnapshot(FOLLOWER_ID, fts); //send first chunk and no InstallSnapshotReply received yet @@ -918,7 +918,8 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm)); - FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); + LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, + actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); leader.setFollowerSnapshot(FOLLOWER_ID, fts); while(!fts.isLastChunk(fts.getChunkIndex())) { fts.getNextChunk(); @@ -1128,7 +1129,8 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); - assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue()); + assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE, + installSnapshot.getLastChunkHashCode().get().intValue()); int hashCode = Arrays.hashCode(installSnapshot.getData()); @@ -1167,7 +1169,8 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); byte[] barray = bs.toByteArray(); - FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); + LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, + actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); leader.setFollowerSnapshot(FOLLOWER_ID, fts); assertEquals(bs.size(), barray.length); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java index c1bc215b6b..6816c9505e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java @@ -71,7 +71,7 @@ public class SnapshotTrackerTest { SnapshotTracker tracker3 = new SnapshotTracker(logger, 2, "leader"); try { - tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.absent()); + tracker3.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1, chunk1, Optional.absent()); Assert.fail(); } catch(SnapshotTracker.InvalidChunkException e){ @@ -80,10 +80,10 @@ public class SnapshotTrackerTest { // Out of sequence chunk indexes won't work SnapshotTracker tracker4 = new SnapshotTracker(logger, 2, "leader"); - tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.absent()); + tracker4.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX, chunk1, Optional.absent()); try { - tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX+2, chunk2, Optional.absent()); + tracker4.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX+2, chunk2, Optional.absent()); Assert.fail(); } catch(SnapshotTracker.InvalidChunkException e){ @@ -93,19 +93,19 @@ public class SnapshotTrackerTest { // If the lastChunkHashCode is missing SnapshotTracker tracker5 = new SnapshotTracker(logger, 2, "leader"); - tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.absent()); + tracker5.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX, chunk1, Optional.absent()); // Look I can add the same chunk again - tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk1, Optional.absent()); + tracker5.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX + 1, chunk1, Optional.absent()); // An exception will be thrown when an invalid chunk is addedd with the right sequence // when the lastChunkHashCode is present SnapshotTracker tracker6 = new SnapshotTracker(logger, 2, "leader"); - tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1)); + tracker6.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1)); try { // Here we add a second chunk and tell addChunk that the previous chunk had a hash code 777 - tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777)); + tracker6.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777)); Assert.fail(); }catch(SnapshotTracker.InvalidChunkException e){ @@ -129,7 +129,7 @@ public class SnapshotTrackerTest { SnapshotTracker tracker2 = new SnapshotTracker(logger, 3, "leader"); - tracker2.addChunk(1, chunk1, Optional.of(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE)); + tracker2.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE)); tracker2.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1))); tracker2.addChunk(3, chunk3, Optional.of(Arrays.hashCode(chunk2))); @@ -144,7 +144,7 @@ public class SnapshotTrackerTest { ByteString chunks = ByteString.copyFrom(chunk1).concat(ByteString.copyFrom(chunk2)); - tracker1.addChunk(1, chunk1, Optional.of(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE)); + tracker1.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE)); tracker1.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1))); assertEquals(chunks, tracker1.getCollectedChunks());