X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderInstallSnapshotState.java;h=946c56bec091e6dba8c490a1c9ede601e8d7e532;hp=100c99bb5eff1f53a0826eae8b464a4746d2c267;hb=1b1360ac337d23b9a586f62616eb278c3065eef0;hpb=d3e310b940b60f6590f0e94a576aece95a055942 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java index 100c99bb5e..946c56bec0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java @@ -7,7 +7,9 @@ */ package org.opendaylight.controller.cluster.raft.behaviors; -import com.google.protobuf.ByteString; +import com.google.common.io.ByteSource; +import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,7 +17,7 @@ import org.slf4j.LoggerFactory; /** * Encapsulates the leader state and logic for sending snapshot chunks to a follower. */ -class LeaderInstallSnapshotState { +public final class LeaderInstallSnapshotState implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class); // The index of the first chunk that is sent when installing a snapshot @@ -27,39 +29,46 @@ class LeaderInstallSnapshotState { // This would be passed as the hash code of the last chunk when sending the first chunk static final int INITIAL_LAST_CHUNK_HASH_CODE = -1; - private int snapshotChunkSize; - private final ByteString snapshotBytes; + private final int snapshotChunkSize; private final String logName; + private ByteSource snapshotBytes; private int offset = 0; // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset - private int replyReceivedForOffset; + private int replyReceivedForOffset = -1; // if replyStatus is false, the previous chunk is attempted private boolean replyStatus = false; - private int chunkIndex; - private final int totalChunks; + private int chunkIndex = FIRST_CHUNK_INDEX; + private int totalChunks; private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; + private long snapshotSize; + private InputStream snapshotInputStream; - public LeaderInstallSnapshotState(ByteString snapshotBytes, int snapshotChunkSize, String logName) { + LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) { this.snapshotChunkSize = snapshotChunkSize; - this.snapshotBytes = snapshotBytes; this.logName = logName; - int size = snapshotBytes.size(); - totalChunks = (size / snapshotChunkSize) + - (size % snapshotChunkSize > 0 ? 1 : 0); + } + + void setSnapshotBytes(final ByteSource snapshotBytes) throws IOException { + if (this.snapshotBytes != null) { + return; + } + + snapshotSize = snapshotBytes.size(); + snapshotInputStream = snapshotBytes.openStream(); - LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, size, totalChunks); + this.snapshotBytes = snapshotBytes; + + totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0)); + + LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks); replyReceivedForOffset = -1; chunkIndex = FIRST_CHUNK_INDEX; } - ByteString getSnapshotBytes() { - return snapshotBytes; - } - int incrementOffset() { - if(replyStatus) { + if (replyStatus) { // if prev chunk failed, we would want to sent the same chunk again offset = offset + snapshotChunkSize; } @@ -84,14 +93,15 @@ class LeaderInstallSnapshotState { boolean canSendNextChunk() { // we only send a false if a chunk is sent but we have not received a reply yet - return replyReceivedForOffset == offset; + return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE + || replyReceivedForOffset == offset); } - boolean isLastChunk(int index) { + boolean isLastChunk(final int index) { return totalChunks == index; } - void markSendStatus(boolean success) { + void markSendStatus(final boolean success) { if (success) { // if the chunk sent was successful replyReceivedForOffset = offset; @@ -104,34 +114,64 @@ class LeaderInstallSnapshotState { } } - byte[] getNextChunk() { - int snapshotLength = getSnapshotBytes().size(); + byte[] getNextChunk() throws IOException { int start = incrementOffset(); int size = snapshotChunkSize; - if (snapshotChunkSize > snapshotLength) { - size = snapshotLength; - } else if ((start + snapshotChunkSize) > snapshotLength) { - size = snapshotLength - start; + if (snapshotChunkSize > snapshotSize) { + size = (int) snapshotSize; + } else if (start + snapshotChunkSize > snapshotSize) { + size = (int) (snapshotSize - start); } byte[] nextChunk = new byte[size]; - getSnapshotBytes().copyTo(nextChunk, start, 0, size); + int numRead = snapshotInputStream.read(nextChunk); + if (numRead != size) { + throw new IOException(String.format( + "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size)); + } + nextChunkHashCode = Arrays.hashCode(nextChunk); LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName, - snapshotLength, start, size, nextChunkHashCode); + snapshotSize, start, size, nextChunkHashCode); return nextChunk; } /** - * reset should be called when the Follower needs to be sent the snapshot from the beginning + * Reset should be called when the Follower needs to be sent the snapshot from the beginning. */ - void reset(){ + void reset() { + closeStream(); + offset = 0; replyStatus = false; replyReceivedForOffset = offset; chunkIndex = FIRST_CHUNK_INDEX; lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; + + try { + snapshotInputStream = snapshotBytes.openStream(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + closeStream(); + snapshotBytes = null; + } + + private void closeStream() { + if (snapshotInputStream != null) { + try { + snapshotInputStream.close(); + } catch (IOException e) { + LOG.warn("{}: Error closing snapshot stream", logName); + } + + snapshotInputStream = null; + } } int getLastChunkHashCode() {