X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderInstallSnapshotState.java;h=946c56bec091e6dba8c490a1c9ede601e8d7e532;hp=e0d76a54a41b2a5bcd2afd14f2e1855b7595b163;hb=1b1360ac337d23b9a586f62616eb278c3065eef0;hpb=b0f8283587b5cc8573d29f66219cbe7f70e21e1b diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java index e0d76a54a4..946c56bec0 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java @@ -7,7 +7,9 @@ */ package org.opendaylight.controller.cluster.raft.behaviors; -import com.google.protobuf.ByteString; +import com.google.common.io.ByteSource; +import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,7 +17,7 @@ import org.slf4j.LoggerFactory; /** * Encapsulates the leader state and logic for sending snapshot chunks to a follower. */ -public final class LeaderInstallSnapshotState { +public final class LeaderInstallSnapshotState implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class); // The index of the first chunk that is sent when installing a snapshot @@ -29,7 +31,7 @@ public final class LeaderInstallSnapshotState { private final int snapshotChunkSize; private final String logName; - private ByteString snapshotBytes; + private ByteSource snapshotBytes; private int offset = 0; // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset private int replyReceivedForOffset = -1; @@ -39,26 +41,27 @@ public final class LeaderInstallSnapshotState { private int totalChunks; private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; + private long snapshotSize; + private InputStream snapshotInputStream; - LeaderInstallSnapshotState(int snapshotChunkSize, String logName) { + LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) { this.snapshotChunkSize = snapshotChunkSize; this.logName = logName; } - ByteString getSnapshotBytes() { - return snapshotBytes; - } - - void setSnapshotBytes(ByteString snapshotBytes) { + void setSnapshotBytes(final ByteSource snapshotBytes) throws IOException { if (this.snapshotBytes != null) { return; } + snapshotSize = snapshotBytes.size(); + snapshotInputStream = snapshotBytes.openStream(); + this.snapshotBytes = snapshotBytes; - int size = snapshotBytes.size(); - totalChunks = size / snapshotChunkSize + (size % snapshotChunkSize > 0 ? 1 : 0); - LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, size, totalChunks); + totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0)); + + LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks); replyReceivedForOffset = -1; chunkIndex = FIRST_CHUNK_INDEX; @@ -90,14 +93,15 @@ public final class LeaderInstallSnapshotState { boolean canSendNextChunk() { // we only send a false if a chunk is sent but we have not received a reply yet - return snapshotBytes != null && replyReceivedForOffset == offset; + return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE + || replyReceivedForOffset == offset); } - boolean isLastChunk(int index) { + boolean isLastChunk(final int index) { return totalChunks == index; } - void markSendStatus(boolean success) { + void markSendStatus(final boolean success) { if (success) { // if the chunk sent was successful replyReceivedForOffset = offset; @@ -110,22 +114,26 @@ public final class LeaderInstallSnapshotState { } } - byte[] getNextChunk() { - int snapshotLength = getSnapshotBytes().size(); + byte[] getNextChunk() throws IOException { int start = incrementOffset(); int size = snapshotChunkSize; - if (snapshotChunkSize > snapshotLength) { - size = snapshotLength; - } else if (start + snapshotChunkSize > snapshotLength) { - size = snapshotLength - start; + if (snapshotChunkSize > snapshotSize) { + size = (int) snapshotSize; + } else if (start + snapshotChunkSize > snapshotSize) { + size = (int) (snapshotSize - start); } byte[] nextChunk = new byte[size]; - getSnapshotBytes().copyTo(nextChunk, start, 0, size); + int numRead = snapshotInputStream.read(nextChunk); + if (numRead != size) { + throw new IOException(String.format( + "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size)); + } + nextChunkHashCode = Arrays.hashCode(nextChunk); LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName, - snapshotLength, start, size, nextChunkHashCode); + snapshotSize, start, size, nextChunkHashCode); return nextChunk; } @@ -133,11 +141,37 @@ public final class LeaderInstallSnapshotState { * Reset should be called when the Follower needs to be sent the snapshot from the beginning. */ void reset() { + closeStream(); + offset = 0; replyStatus = false; replyReceivedForOffset = offset; chunkIndex = FIRST_CHUNK_INDEX; lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; + + try { + snapshotInputStream = snapshotBytes.openStream(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + closeStream(); + snapshotBytes = null; + } + + private void closeStream() { + if (snapshotInputStream != null) { + try { + snapshotInputStream.close(); + } catch (IOException e) { + LOG.warn("{}: Error closing snapshot stream", logName); + } + + snapshotInputStream = null; + } } int getLastChunkHashCode() {