X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderInstallSnapshotState.java;h=758345214841ba62ab41af0827427a505562198f;hp=946c56bec091e6dba8c490a1c9ede601e8d7e532;hb=refs%2Fchanges%2F25%2F82725%2F1;hpb=b5cb353e3553a39f576c284119af75ffa5ea66a9 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java index 946c56bec0..7583452148 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java @@ -7,12 +7,16 @@ */ package org.opendaylight.controller.cluster.raft.behaviors; +import com.google.common.base.MoreObjects; +import com.google.common.base.Stopwatch; import com.google.common.io.ByteSource; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; /** * Encapsulates the leader state and logic for sending snapshot chunks to a follower. @@ -43,6 +47,8 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; private long snapshotSize; private InputStream snapshotInputStream; + private Stopwatch chunkTimer = Stopwatch.createUnstarted(); + private byte[] currentChunk = null; LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) { this.snapshotChunkSize = snapshotChunkSize; @@ -68,8 +74,12 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { } int incrementOffset() { + // if first chunk was retried, reset offset back to initial 0 + if (offset == -1) { + offset = 0; + } if (replyStatus) { - // if prev chunk failed, we would want to sent the same chunk again + // if prev chunk failed, we would want to send the same chunk again offset = offset + snapshotChunkSize; } return offset; @@ -77,12 +87,24 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { int incrementChunkIndex() { if (replyStatus) { - // if prev chunk failed, we would want to sent the same chunk again + // if prev chunk failed, we would want to send the same chunk again chunkIndex = chunkIndex + 1; } return chunkIndex; } + void startChunkTimer() { + chunkTimer.start(); + } + + void resetChunkTimer() { + chunkTimer.reset(); + } + + boolean isChunkTimedOut(final FiniteDuration timeout) { + return chunkTimer.elapsed(TimeUnit.SECONDS) > timeout.toSeconds(); + } + int getChunkIndex() { return chunkIndex; } @@ -108,33 +130,39 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { replyStatus = true; lastChunkHashCode = nextChunkHashCode; } else { - // if the chunk sent was failure - replyReceivedForOffset = offset; + // if the chunk sent was failure, revert offset to previous so we can retry + offset = replyReceivedForOffset; replyStatus = false; } } byte[] getNextChunk() throws IOException { + // increment offset to indicate next chunk is in flight, canSendNextChunk() wont let us hit this again until, + // markSendStatus() is called with either success or failure int start = incrementOffset(); - int size = snapshotChunkSize; - if (snapshotChunkSize > snapshotSize) { - size = (int) snapshotSize; - } else if (start + snapshotChunkSize > snapshotSize) { - size = (int) (snapshotSize - start); - } + if (replyStatus || currentChunk == null) { + int size = snapshotChunkSize; + if (snapshotChunkSize > snapshotSize) { + size = (int) snapshotSize; + } else if (start + snapshotChunkSize > snapshotSize) { + size = (int) (snapshotSize - start); + } - byte[] nextChunk = new byte[size]; - int numRead = snapshotInputStream.read(nextChunk); - if (numRead != size) { - throw new IOException(String.format( - "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size)); - } + currentChunk = new byte[size]; + int numRead = snapshotInputStream.read(currentChunk); + if (numRead != size) { + throw new IOException(String.format( + "The # of bytes read from the input stream, %d," + + "does not match the expected # %d", numRead, size)); + } + + nextChunkHashCode = Arrays.hashCode(currentChunk); - nextChunkHashCode = Arrays.hashCode(nextChunk); + LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName, + snapshotSize, start, size, nextChunkHashCode); + } - LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName, - snapshotSize, start, size, nextChunkHashCode); - return nextChunk; + return currentChunk; } /** @@ -142,11 +170,13 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { */ void reset() { closeStream(); + chunkTimer.reset(); offset = 0; replyStatus = false; replyReceivedForOffset = offset; chunkIndex = FIRST_CHUNK_INDEX; + currentChunk = null; lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; try { @@ -177,4 +207,20 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { int getLastChunkHashCode() { return lastChunkHashCode; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("snapshotChunkSize", snapshotChunkSize) + .add("offset", offset) + .add("replyReceivedForOffset", replyReceivedForOffset) + .add("replyStatus", replyStatus) + .add("chunkIndex", chunkIndex) + .add("totalChunks", totalChunks) + .add("lastChunkHashCode", lastChunkHashCode) + .add("nextChunkHashCode", nextChunkHashCode) + .add("snapshotSize", snapshotSize) + .add("chunkTimer", chunkTimer) + .toString(); + } }