- return (minPresent != 0);
- }
-
- /**
- * Encapsulates the snapshot bytestring and handles the logic of sending
- * snapshot chunks
- */
- protected class FollowerToSnapshot {
- private final ByteString snapshotBytes;
- private int offset = 0;
- // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
- private int replyReceivedForOffset;
- // if replyStatus is false, the previous chunk is attempted
- private boolean replyStatus = false;
- private int chunkIndex;
- private final int totalChunks;
- private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
- private int nextChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
-
- public FollowerToSnapshot(ByteString snapshotBytes) {
- this.snapshotBytes = snapshotBytes;
- int size = snapshotBytes.size();
- totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
- ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
- logName(), size, totalChunks);
- }
- replyReceivedForOffset = -1;
- chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
- }
-
- public ByteString getSnapshotBytes() {
- return snapshotBytes;
- }
-
- public int incrementOffset() {
- if(replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
- offset = offset + context.getConfigParams().getSnapshotChunkSize();
- }
- return offset;
- }
-
- public int incrementChunkIndex() {
- if (replyStatus) {
- // if prev chunk failed, we would want to sent the same chunk again
- chunkIndex = chunkIndex + 1;
- }
- return chunkIndex;
- }
-
- public int getChunkIndex() {
- return chunkIndex;
- }
-
- public int getTotalChunks() {
- return totalChunks;
- }
-
- public boolean canSendNextChunk() {
- // we only send a false if a chunk is sent but we have not received a reply yet
- return replyReceivedForOffset == offset;
- }
-
- public boolean isLastChunk(int chunkIndex) {
- return totalChunks == chunkIndex;
- }
-
- public void markSendStatus(boolean success) {
- if (success) {
- // if the chunk sent was successful
- replyReceivedForOffset = offset;
- replyStatus = true;
- lastChunkHashCode = nextChunkHashCode;
- } else {
- // if the chunk sent was failure
- replyReceivedForOffset = offset;
- replyStatus = false;
- }
- }
-
- public ByteString getNextChunk() {
- int snapshotLength = getSnapshotBytes().size();
- int start = incrementOffset();
- int size = context.getConfigParams().getSnapshotChunkSize();
- if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) {
- size = snapshotLength;
- } else {
- if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) {
- size = snapshotLength - start;
- }
- }
-
-
- LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(),
- snapshotLength, start, size);
-
- ByteString substring = getSnapshotBytes().substring(start, start + size);
- nextChunkHashCode = substring.hashCode();
- return substring;
- }
-
- /**
- * reset should be called when the Follower needs to be sent the snapshot from the beginning
- */
- public void reset(){
- offset = 0;
- replyStatus = false;
- replyReceivedForOffset = offset;
- chunkIndex = AbstractLeader.FIRST_CHUNK_INDEX;
- lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
- }
-
- public int getLastChunkHashCode() {
- return lastChunkHashCode;
- }