X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderInstallSnapshotState.java;h=cc4caa32ebacc8511fd4ea610ea6af2af2e53bf4;hb=refs%2Fchanges%2F17%2F83317%2F7;hp=5d47dbd02ec9754e94e5bf0f3c81e1a769caf6ba;hpb=5fd4213b5bfaf2db21f1b37139f6b98535a872c0;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java index 5d47dbd02e..cc4caa32eb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java @@ -7,13 +7,16 @@ */ package org.opendaylight.controller.cluster.raft.behaviors; -import com.google.common.base.Throwables; +import com.google.common.base.MoreObjects; +import com.google.common.base.Stopwatch; import com.google.common.io.ByteSource; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; /** * Encapsulates the leader state and logic for sending snapshot chunks to a follower. @@ -27,13 +30,15 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { // The index that the follower should respond with if it needs the install snapshot to be reset static final int INVALID_CHUNK_INDEX = -1; + static final int INITIAL_OFFSET = -1; + // This would be passed as the hash code of the last chunk when sending the first chunk static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; private final int snapshotChunkSize; private final String logName; private ByteSource snapshotBytes; - private int offset = 0; + private int offset = INITIAL_OFFSET; // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset private int replyReceivedForOffset = -1; // if replyStatus is false, the previous chunk is attempted @@ -44,13 +49,15 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; private long snapshotSize; private InputStream snapshotInputStream; + private Stopwatch chunkTimer = Stopwatch.createUnstarted(); + private byte[] currentChunk = null; - LeaderInstallSnapshotState(int snapshotChunkSize, String logName) { + LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) { this.snapshotChunkSize = snapshotChunkSize; this.logName = logName; } - void setSnapshotBytes(ByteSource snapshotBytes) throws IOException { + void setSnapshotBytes(final ByteSource snapshotBytes) throws IOException { if (this.snapshotBytes != null) { return; } @@ -64,13 +71,15 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks); - replyReceivedForOffset = -1; + replyReceivedForOffset = INITIAL_OFFSET; chunkIndex = FIRST_CHUNK_INDEX; } int incrementOffset() { - if (replyStatus) { - // if prev chunk failed, we would want to sent the same chunk again + // if offset is -1 doesnt matter whether it was the initial value or reset, move the offset to 0 to begin with + if (offset == INITIAL_OFFSET) { + offset = 0; + } else { offset = offset + snapshotChunkSize; } return offset; @@ -78,12 +87,24 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { int incrementChunkIndex() { if (replyStatus) { - // if prev chunk failed, we would want to sent the same chunk again + // if prev chunk failed, we would want to send the same chunk again chunkIndex = chunkIndex + 1; } return chunkIndex; } + void startChunkTimer() { + chunkTimer.start(); + } + + void resetChunkTimer() { + chunkTimer.reset(); + } + + boolean isChunkTimedOut(final FiniteDuration timeout) { + return chunkTimer.elapsed(TimeUnit.SECONDS) > timeout.toSeconds(); + } + int getChunkIndex() { return chunkIndex; } @@ -94,47 +115,54 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { boolean canSendNextChunk() { // we only send a false if a chunk is sent but we have not received a reply yet - return snapshotBytes != null && replyReceivedForOffset == offset; + return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE + || replyReceivedForOffset == offset); } - boolean isLastChunk(int index) { + boolean isLastChunk(final int index) { return totalChunks == index; } - void markSendStatus(boolean success) { + void markSendStatus(final boolean success) { if (success) { // if the chunk sent was successful replyReceivedForOffset = offset; replyStatus = true; lastChunkHashCode = nextChunkHashCode; } else { - // if the chunk sent was failure - replyReceivedForOffset = offset; + // if the chunk sent was failure, revert offset to previous so we can retry + offset = replyReceivedForOffset; replyStatus = false; } } byte[] getNextChunk() throws IOException { + // increment offset to indicate next chunk is in flight, canSendNextChunk() wont let us hit this again until, + // markSendStatus() is called with either success or failure int start = incrementOffset(); - int size = snapshotChunkSize; - if (snapshotChunkSize > snapshotSize) { - size = (int) snapshotSize; - } else if (start + snapshotChunkSize > snapshotSize) { - size = (int) (snapshotSize - start); - } + if (replyStatus || currentChunk == null) { + int size = snapshotChunkSize; + if (snapshotChunkSize > snapshotSize) { + size = (int) snapshotSize; + } else if (start + snapshotChunkSize > snapshotSize) { + size = (int) (snapshotSize - start); + } - byte[] nextChunk = new byte[size]; - int numRead = snapshotInputStream.read(nextChunk); - if (numRead != size) { - throw new IOException(String.format( - "The # of bytes read from the imput stream, %d, does not match the expected # %d", numRead, size)); - } + currentChunk = new byte[size]; + int numRead = snapshotInputStream.read(currentChunk); + if (numRead != size) { + throw new IOException(String.format( + "The # of bytes read from the input stream, %d," + + "does not match the expected # %d", numRead, size)); + } - nextChunkHashCode = Arrays.hashCode(nextChunk); + nextChunkHashCode = Arrays.hashCode(currentChunk); - LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName, - snapshotSize, start, size, nextChunkHashCode); - return nextChunk; + LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName, + snapshotSize, start, size, nextChunkHashCode); + } + + return currentChunk; } /** @@ -142,17 +170,20 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { */ void reset() { closeStream(); + chunkTimer.reset(); - offset = 0; + offset = INITIAL_OFFSET; replyStatus = false; - replyReceivedForOffset = offset; + replyReceivedForOffset = INITIAL_OFFSET; chunkIndex = FIRST_CHUNK_INDEX; + currentChunk = null; lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; + nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; try { snapshotInputStream = snapshotBytes.openStream(); } catch (IOException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -177,4 +208,20 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { int getLastChunkHashCode() { return lastChunkHashCode; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("snapshotChunkSize", snapshotChunkSize) + .add("offset", offset) + .add("replyReceivedForOffset", replyReceivedForOffset) + .add("replyStatus", replyStatus) + .add("chunkIndex", chunkIndex) + .add("totalChunks", totalChunks) + .add("lastChunkHashCode", lastChunkHashCode) + .add("nextChunkHashCode", nextChunkHashCode) + .add("snapshotSize", snapshotSize) + .add("chunkTimer", chunkTimer) + .toString(); + } }