import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
* set commitIndex = N (§5.3, §5.4).
*/
public abstract class AbstractLeader extends AbstractRaftActorBehavior {
-
- // The index of the first chunk that is sent when installing a snapshot
- public static final int FIRST_CHUNK_INDEX = 1;
-
- // The index that the follower should respond with if it needs the install snapshot to be reset
- public static final int INVALID_CHUNK_INDEX = -1;
-
- // This would be passed as the hash code of the last chunk when sending the first chunk
- public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
-
private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
- private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
+ private final Map<String, LeaderInstallSnapshotState> mapFollowerToSnapshot = new HashMap<>();
/**
* Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
LOG.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
String followerId = reply.getFollowerId();
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
if (followerToSnapshot == null) {
LOG.error("{}: FollowerToSnapshot not found for follower {} in InstallSnapshotReply",
logName(), reply.getChunkIndex(), followerId,
followerToSnapshot.getChunkIndex());
- if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
+ if(reply.getChunkIndex() == LeaderInstallSnapshotState.INVALID_CHUNK_INDEX){
// Since the Follower did not find this index to be valid we should reset the follower snapshot
// so that Installing the snapshot can resume from the beginning
followerToSnapshot.reset();
// Note: the previous call to getNextSnapshotChunk has the side-effect of adding
// followerId to the followerToSnapshot map.
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
int nextChunkIndex = followerToSnapshot.incrementChunkIndex();
Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
* creates and return a ByteString chunk
*/
private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+ LeaderInstallSnapshotState followerToSnapshot = mapFollowerToSnapshot.get(followerId);
if (followerToSnapshot == null) {
- followerToSnapshot = new FollowerToSnapshot(snapshotBytes);
+ followerToSnapshot = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(),
+ logName());
mapFollowerToSnapshot.put(followerId, followerToSnapshot);
}
byte[] nextChunk = followerToSnapshot.getNextChunk();
return minPresent != 0;
}
- /**
- * Encapsulates the snapshot bytestring and handles the logic of sending
- * snapshot chunks
- */
- protected class FollowerToSnapshot {
- private final ByteString snapshotBytes;
- private int offset = 0;
- // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
- private int replyReceivedForOffset;
- // if replyStatus is false, the previous chunk is attempted
- private boolean replyStatus = false;
- private int chunkIndex;
- private final int totalChunks;
- private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
- private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
-
- public FollowerToSnapshot(ByteString snapshotBytes) {
- this.snapshotBytes = snapshotBytes;
- int size = snapshotBytes.size();
- totalChunks = (size / context.getConfigParams().getSnapshotChunkSize()) +
- (size % context.getConfigParams().getSnapshotChunkSize() > 0 ? 1 : 0);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
- logName(), size, totalChunks);
- }
- replyReceivedForOffset = -1;
- chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
- }
-
- public ByteString getSnapshotBytes() {
- return snapshotBytes;
- }
-
- public int incrementOffset() {
- if(replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
- offset = offset + context.getConfigParams().getSnapshotChunkSize();
- }
- return offset;
- }
-
- public int incrementChunkIndex() {
- if (replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
- chunkIndex = chunkIndex + 1;
- }
- return chunkIndex;
- }
-
- public int getChunkIndex() {
- return chunkIndex;
- }
-
- public int getTotalChunks() {
- return totalChunks;
- }
-
- public boolean canSendNextChunk() {
- // we only send a false if a chunk is sent but we have not received a reply yet
- return replyReceivedForOffset == offset;
- }
-
- public boolean isLastChunk(int chunkIndex) {
- return totalChunks == chunkIndex;
- }
-
- public void markSendStatus(boolean success) {
- if (success) {
- // if the chunk sent was successful
- replyReceivedForOffset = offset;
- replyStatus = true;
- lastChunkHashCode = nextChunkHashCode;
- } else {
- // if the chunk sent was failure
- replyReceivedForOffset = offset;
- replyStatus = false;
- }
- }
-
- public byte[] getNextChunk() {
- int snapshotLength = getSnapshotBytes().size();
- int start = incrementOffset();
- int size = context.getConfigParams().getSnapshotChunkSize();
- if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
- size = snapshotLength;
- } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
- size = snapshotLength - start;
- }
-
- byte[] nextChunk = new byte[size];
- getSnapshotBytes().copyTo(nextChunk, start, 0, size);
- nextChunkHashCode = Arrays.hashCode(nextChunk);
-
- LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(),
- snapshotLength, start, size, nextChunkHashCode);
- return nextChunk;
- }
-
- /**
- * reset should be called when the Follower needs to be sent the snapshot from the beginning
- */
- public void reset(){
- offset = 0;
- replyStatus = false;
- replyReceivedForOffset = offset;
- chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
- lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
- }
-
- public int getLastChunkHashCode() {
- return lastChunkHashCode;
- }
- }
-
// called from example-actor for printing the follower-states
public String printFollowerStates() {
final StringBuilder sb = new StringBuilder();
}
@VisibleForTesting
- protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) {
+ protected void setFollowerSnapshot(String followerId, LeaderInstallSnapshotState snapshot) {
mapFollowerToSnapshot.put(followerId, snapshot);
}