X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderInstallSnapshotState.java;h=758345214841ba62ab41af0827427a505562198f;hp=81c3eff45180b70b6c5001563bfba394dbc87c4e;hb=refs%2Fchanges%2F25%2F82725%2F1;hpb=95d3c7975a423951dcbdecfbfa4cb6b7a23591cc diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java index 81c3eff451..7583452148 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java @@ -7,15 +7,21 @@ */ package org.opendaylight.controller.cluster.raft.behaviors; -import com.google.protobuf.ByteString; +import com.google.common.base.MoreObjects; +import com.google.common.base.Stopwatch; +import com.google.common.io.ByteSource; +import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; /** * Encapsulates the leader state and logic for sending snapshot chunks to a follower. */ -public final class LeaderInstallSnapshotState { +public final class LeaderInstallSnapshotState implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class); // The index of the first chunk that is sent when installing a snapshot @@ -27,40 +33,53 @@ public final class LeaderInstallSnapshotState { // This would be passed as the hash code of the last chunk when sending the first chunk static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; - private int snapshotChunkSize; - private final ByteString snapshotBytes; + private final int snapshotChunkSize; private final String logName; + private ByteSource snapshotBytes; private int offset = 0; // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset - private int replyReceivedForOffset; + private int replyReceivedForOffset = -1; // if replyStatus is false, the previous chunk is attempted private boolean replyStatus = false; - private int chunkIndex; - private final int totalChunks; + private int chunkIndex = FIRST_CHUNK_INDEX; + private int totalChunks; private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; + private long snapshotSize; + private InputStream snapshotInputStream; + private Stopwatch chunkTimer = Stopwatch.createUnstarted(); + private byte[] currentChunk = null; - LeaderInstallSnapshotState(ByteString snapshotBytes, int snapshotChunkSize, String logName) { + LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) { this.snapshotChunkSize = snapshotChunkSize; - this.snapshotBytes = snapshotBytes; this.logName = logName; - int size = snapshotBytes.size(); - totalChunks = size / snapshotChunkSize + - (size % snapshotChunkSize > 0 ? 1 : 0); + } + + void setSnapshotBytes(final ByteSource snapshotBytes) throws IOException { + if (this.snapshotBytes != null) { + return; + } + + snapshotSize = snapshotBytes.size(); + snapshotInputStream = snapshotBytes.openStream(); + + this.snapshotBytes = snapshotBytes; + + totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0)); - LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, size, totalChunks); + LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks); replyReceivedForOffset = -1; chunkIndex = FIRST_CHUNK_INDEX; } - ByteString getSnapshotBytes() { - return snapshotBytes; - } - int incrementOffset() { - if(replyStatus) { - // if prev chunk failed, we would want to sent the same chunk again + // if first chunk was retried, reset offset back to initial 0 + if (offset == -1) { + offset = 0; + } + if (replyStatus) { + // if prev chunk failed, we would want to send the same chunk again offset = offset + snapshotChunkSize; } return offset; @@ -68,12 +87,24 @@ public final class LeaderInstallSnapshotState { int incrementChunkIndex() { if (replyStatus) { - // if prev chunk failed, we would want to sent the same chunk again + // if prev chunk failed, we would want to send the same chunk again chunkIndex = chunkIndex + 1; } return chunkIndex; } + void startChunkTimer() { + chunkTimer.start(); + } + + void resetChunkTimer() { + chunkTimer.reset(); + } + + boolean isChunkTimedOut(final FiniteDuration timeout) { + return chunkTimer.elapsed(TimeUnit.SECONDS) > timeout.toSeconds(); + } + int getChunkIndex() { return chunkIndex; } @@ -84,57 +115,112 @@ public final class LeaderInstallSnapshotState { boolean canSendNextChunk() { // we only send a false if a chunk is sent but we have not received a reply yet - return replyReceivedForOffset == offset; + return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE + || replyReceivedForOffset == offset); } - boolean isLastChunk(int index) { + boolean isLastChunk(final int index) { return totalChunks == index; } - void markSendStatus(boolean success) { + void markSendStatus(final boolean success) { if (success) { // if the chunk sent was successful replyReceivedForOffset = offset; replyStatus = true; lastChunkHashCode = nextChunkHashCode; } else { - // if the chunk sent was failure - replyReceivedForOffset = offset; + // if the chunk sent was failure, revert offset to previous so we can retry + offset = replyReceivedForOffset; replyStatus = false; } } - byte[] getNextChunk() { - int snapshotLength = getSnapshotBytes().size(); + byte[] getNextChunk() throws IOException { + // increment offset to indicate next chunk is in flight, canSendNextChunk() wont let us hit this again until, + // markSendStatus() is called with either success or failure int start = incrementOffset(); - int size = snapshotChunkSize; - if (snapshotChunkSize > snapshotLength) { - size = snapshotLength; - } else if (start + snapshotChunkSize > snapshotLength) { - size = snapshotLength - start; + if (replyStatus || currentChunk == null) { + int size = snapshotChunkSize; + if (snapshotChunkSize > snapshotSize) { + size = (int) snapshotSize; + } else if (start + snapshotChunkSize > snapshotSize) { + size = (int) (snapshotSize - start); + } + + currentChunk = new byte[size]; + int numRead = snapshotInputStream.read(currentChunk); + if (numRead != size) { + throw new IOException(String.format( + "The # of bytes read from the input stream, %d," + + "does not match the expected # %d", numRead, size)); + } + + nextChunkHashCode = Arrays.hashCode(currentChunk); + + LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName, + snapshotSize, start, size, nextChunkHashCode); } - byte[] nextChunk = new byte[size]; - getSnapshotBytes().copyTo(nextChunk, start, 0, size); - nextChunkHashCode = Arrays.hashCode(nextChunk); - - LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName, - snapshotLength, start, size, nextChunkHashCode); - return nextChunk; + return currentChunk; } /** - * reset should be called when the Follower needs to be sent the snapshot from the beginning + * Reset should be called when the Follower needs to be sent the snapshot from the beginning. */ - void reset(){ + void reset() { + closeStream(); + chunkTimer.reset(); + offset = 0; replyStatus = false; replyReceivedForOffset = offset; chunkIndex = FIRST_CHUNK_INDEX; + currentChunk = null; lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; + + try { + snapshotInputStream = snapshotBytes.openStream(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + closeStream(); + snapshotBytes = null; + } + + private void closeStream() { + if (snapshotInputStream != null) { + try { + snapshotInputStream.close(); + } catch (IOException e) { + LOG.warn("{}: Error closing snapshot stream", logName); + } + + snapshotInputStream = null; + } } int getLastChunkHashCode() { return lastChunkHashCode; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("snapshotChunkSize", snapshotChunkSize) + .add("offset", offset) + .add("replyReceivedForOffset", replyReceivedForOffset) + .add("replyStatus", replyStatus) + .add("chunkIndex", chunkIndex) + .add("totalChunks", totalChunks) + .add("lastChunkHashCode", lastChunkHashCode) + .add("nextChunkHashCode", nextChunkHashCode) + .add("snapshotSize", snapshotSize) + .add("chunkTimer", chunkTimer) + .toString(); + } }