2 * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import com.google.common.base.MoreObjects;
11 import com.google.common.base.Stopwatch;
12 import com.google.common.io.ByteSource;
13 import java.io.IOException;
14 import java.io.InputStream;
15 import java.util.Arrays;
16 import java.util.concurrent.TimeUnit;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19 import scala.concurrent.duration.FiniteDuration;
22 * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
24 public final class LeaderInstallSnapshotState implements AutoCloseable {
25 private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
27 // The index of the first chunk that is sent when installing a snapshot
28 static final int FIRST_CHUNK_INDEX = 1;
30 // The index that the follower should respond with if it needs the install snapshot to be reset
31 static final int INVALID_CHUNK_INDEX = -1;
33 static final int INITIAL_OFFSET = -1;
35 // This would be passed as the hash code of the last chunk when sending the first chunk
36 static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
38 private final int snapshotChunkSize;
39 private final String logName;
40 private ByteSource snapshotBytes;
41 private int offset = INITIAL_OFFSET;
42 // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
43 private int replyReceivedForOffset = -1;
44 // if replyStatus is false, the previous chunk is attempted
45 private boolean replyStatus = false;
46 private int chunkIndex = FIRST_CHUNK_INDEX;
47 private int totalChunks;
48 private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
49 private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
50 private long snapshotSize;
51 private InputStream snapshotInputStream;
52 private Stopwatch chunkTimer = Stopwatch.createUnstarted();
53 private byte[] currentChunk = null;
55 LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) {
56 this.snapshotChunkSize = snapshotChunkSize;
57 this.logName = logName;
60 void setSnapshotBytes(final ByteSource snapshotBytes) throws IOException {
61 if (this.snapshotBytes != null) {
65 snapshotSize = snapshotBytes.size();
66 snapshotInputStream = snapshotBytes.openStream();
68 this.snapshotBytes = snapshotBytes;
70 totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0));
72 LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks);
74 replyReceivedForOffset = INITIAL_OFFSET;
75 chunkIndex = FIRST_CHUNK_INDEX;
78 int incrementOffset() {
79 // if offset is -1 doesnt matter whether it was the initial value or reset, move the offset to 0 to begin with
80 if (offset == INITIAL_OFFSET) {
83 offset = offset + snapshotChunkSize;
88 int incrementChunkIndex() {
90 // if prev chunk failed, we would want to send the same chunk again
91 chunkIndex = chunkIndex + 1;
96 void startChunkTimer() {
100 void resetChunkTimer() {
104 boolean isChunkTimedOut(final FiniteDuration timeout) {
105 return chunkTimer.elapsed(TimeUnit.SECONDS) > timeout.toSeconds();
108 int getChunkIndex() {
112 int getTotalChunks() {
116 boolean canSendNextChunk() {
117 // we only send a false if a chunk is sent but we have not received a reply yet
118 return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE
119 || replyReceivedForOffset == offset);
122 boolean isLastChunk(final int index) {
123 return totalChunks == index;
126 void markSendStatus(final boolean success) {
128 // if the chunk sent was successful
129 replyReceivedForOffset = offset;
131 lastChunkHashCode = nextChunkHashCode;
133 // if the chunk sent was failure, revert offset to previous so we can retry
134 offset = replyReceivedForOffset;
139 byte[] getNextChunk() throws IOException {
140 // increment offset to indicate next chunk is in flight, canSendNextChunk() wont let us hit this again until,
141 // markSendStatus() is called with either success or failure
142 int start = incrementOffset();
143 if (replyStatus || currentChunk == null) {
144 int size = snapshotChunkSize;
145 if (snapshotChunkSize > snapshotSize) {
146 size = (int) snapshotSize;
147 } else if (start + snapshotChunkSize > snapshotSize) {
148 size = (int) (snapshotSize - start);
151 currentChunk = new byte[size];
152 int numRead = snapshotInputStream.read(currentChunk);
153 if (numRead != size) {
154 throw new IOException(String.format(
155 "The # of bytes read from the input stream, %d,"
156 + "does not match the expected # %d", numRead, size));
159 nextChunkHashCode = Arrays.hashCode(currentChunk);
161 LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
162 snapshotSize, start, size, nextChunkHashCode);
169 * Reset should be called when the Follower needs to be sent the snapshot from the beginning.
175 offset = INITIAL_OFFSET;
177 replyReceivedForOffset = INITIAL_OFFSET;
178 chunkIndex = FIRST_CHUNK_INDEX;
180 lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
181 nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
184 snapshotInputStream = snapshotBytes.openStream();
185 } catch (IOException e) {
186 throw new IllegalStateException(e);
191 public void close() {
193 snapshotBytes = null;
196 private void closeStream() {
197 if (snapshotInputStream != null) {
199 snapshotInputStream.close();
200 } catch (IOException e) {
201 LOG.warn("{}: Error closing snapshot stream", logName);
204 snapshotInputStream = null;
208 int getLastChunkHashCode() {
209 return lastChunkHashCode;
213 public String toString() {
214 return MoreObjects.toStringHelper(this)
215 .add("snapshotChunkSize", snapshotChunkSize)
216 .add("offset", offset)
217 .add("replyReceivedForOffset", replyReceivedForOffset)
218 .add("replyStatus", replyStatus)
219 .add("chunkIndex", chunkIndex)
220 .add("totalChunks", totalChunks)
221 .add("lastChunkHashCode", lastChunkHashCode)
222 .add("nextChunkHashCode", nextChunkHashCode)
223 .add("snapshotSize", snapshotSize)
224 .add("chunkTimer", chunkTimer)