2 * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import com.google.common.base.MoreObjects;
11 import com.google.common.base.Stopwatch;
12 import com.google.common.io.ByteSource;
13 import java.io.IOException;
14 import java.io.InputStream;
15 import java.util.Arrays;
16 import java.util.concurrent.TimeUnit;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19 import scala.concurrent.duration.FiniteDuration;
22 * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
24 public final class LeaderInstallSnapshotState implements AutoCloseable {
25 private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
27 // The index of the first chunk that is sent when installing a snapshot
28 static final int FIRST_CHUNK_INDEX = 1;
30 // The index that the follower should respond with if it needs the install snapshot to be reset
31 static final int INVALID_CHUNK_INDEX = -1;
33 // This would be passed as the hash code of the last chunk when sending the first chunk
34 static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
36 private final int snapshotChunkSize;
37 private final String logName;
38 private ByteSource snapshotBytes;
39 private int offset = 0;
40 // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
41 private int replyReceivedForOffset = -1;
42 // if replyStatus is false, the previous chunk is attempted
43 private boolean replyStatus = false;
44 private int chunkIndex = FIRST_CHUNK_INDEX;
45 private int totalChunks;
46 private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
47 private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
48 private long snapshotSize;
49 private InputStream snapshotInputStream;
50 private Stopwatch chunkTimer = Stopwatch.createUnstarted();
51 private byte[] currentChunk = null;
53 LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) {
54 this.snapshotChunkSize = snapshotChunkSize;
55 this.logName = logName;
58 void setSnapshotBytes(final ByteSource snapshotBytes) throws IOException {
59 if (this.snapshotBytes != null) {
63 snapshotSize = snapshotBytes.size();
64 snapshotInputStream = snapshotBytes.openStream();
66 this.snapshotBytes = snapshotBytes;
68 totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0));
70 LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks);
72 replyReceivedForOffset = -1;
73 chunkIndex = FIRST_CHUNK_INDEX;
76 int incrementOffset() {
78 // if prev chunk failed, we would want to sent the same chunk again
79 offset = offset + snapshotChunkSize;
84 int incrementChunkIndex() {
86 // if prev chunk failed, we would want to sent the same chunk again
87 chunkIndex = chunkIndex + 1;
92 void startChunkTimer() {
96 void resetChunkTimer() {
100 boolean isChunkTimedOut(final FiniteDuration timeout) {
101 return chunkTimer.elapsed(TimeUnit.SECONDS) > timeout.toSeconds();
104 int getChunkIndex() {
108 int getTotalChunks() {
112 boolean canSendNextChunk() {
113 // we only send a false if a chunk is sent but we have not received a reply yet
114 return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE
115 || replyReceivedForOffset == offset);
118 boolean isLastChunk(final int index) {
119 return totalChunks == index;
122 void markSendStatus(final boolean success) {
124 // if the chunk sent was successful
125 replyReceivedForOffset = offset;
127 lastChunkHashCode = nextChunkHashCode;
129 // if the chunk sent was failure
130 replyReceivedForOffset = offset;
135 byte[] getNextChunk() throws IOException {
136 if (replyStatus || currentChunk == null) {
137 int start = incrementOffset();
138 int size = snapshotChunkSize;
139 if (snapshotChunkSize > snapshotSize) {
140 size = (int) snapshotSize;
141 } else if (start + snapshotChunkSize > snapshotSize) {
142 size = (int) (snapshotSize - start);
145 currentChunk = new byte[size];
146 int numRead = snapshotInputStream.read(currentChunk);
147 if (numRead != size) {
148 throw new IOException(String.format(
149 "The # of bytes read from the input stream, %d,"
150 + "does not match the expected # %d", numRead, size));
153 nextChunkHashCode = Arrays.hashCode(currentChunk);
155 LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
156 snapshotSize, start, size, nextChunkHashCode);
163 * Reset should be called when the Follower needs to be sent the snapshot from the beginning.
171 replyReceivedForOffset = offset;
172 chunkIndex = FIRST_CHUNK_INDEX;
174 lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
177 snapshotInputStream = snapshotBytes.openStream();
178 } catch (IOException e) {
179 throw new RuntimeException(e);
184 public void close() {
186 snapshotBytes = null;
189 private void closeStream() {
190 if (snapshotInputStream != null) {
192 snapshotInputStream.close();
193 } catch (IOException e) {
194 LOG.warn("{}: Error closing snapshot stream", logName);
197 snapshotInputStream = null;
201 int getLastChunkHashCode() {
202 return lastChunkHashCode;
206 public String toString() {
207 return MoreObjects.toStringHelper(this)
208 .add("snapshotChunkSize", snapshotChunkSize)
209 .add("offset", offset)
210 .add("replyReceivedForOffset", replyReceivedForOffset)
211 .add("replyStatus", replyStatus)
212 .add("chunkIndex", chunkIndex)
213 .add("totalChunks", totalChunks)
214 .add("lastChunkHashCode", lastChunkHashCode)
215 .add("nextChunkHashCode", nextChunkHashCode)
216 .add("snapshotSize", snapshotSize)
217 .add("chunkTimer", chunkTimer)