2 * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import com.google.common.base.Stopwatch;
11 import com.google.common.io.ByteSource;
12 import java.io.IOException;
13 import java.io.InputStream;
14 import java.util.Arrays;
15 import java.util.concurrent.TimeUnit;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18 import scala.concurrent.duration.FiniteDuration;
21 * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
23 public final class LeaderInstallSnapshotState implements AutoCloseable {
24 private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
26 // The index of the first chunk that is sent when installing a snapshot
27 static final int FIRST_CHUNK_INDEX = 1;
29 // The index that the follower should respond with if it needs the install snapshot to be reset
30 static final int INVALID_CHUNK_INDEX = -1;
32 // This would be passed as the hash code of the last chunk when sending the first chunk
33 static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
35 private final int snapshotChunkSize;
36 private final String logName;
37 private ByteSource snapshotBytes;
38 private int offset = 0;
39 // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
40 private int replyReceivedForOffset = -1;
41 // if replyStatus is false, the previous chunk is attempted
42 private boolean replyStatus = false;
43 private int chunkIndex = FIRST_CHUNK_INDEX;
44 private int totalChunks;
45 private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
46 private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
47 private long snapshotSize;
48 private InputStream snapshotInputStream;
49 private Stopwatch chunkTimer = Stopwatch.createUnstarted();
50 private byte[] currentChunk = null;
52 LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) {
53 this.snapshotChunkSize = snapshotChunkSize;
54 this.logName = logName;
57 void setSnapshotBytes(final ByteSource snapshotBytes) throws IOException {
58 if (this.snapshotBytes != null) {
62 snapshotSize = snapshotBytes.size();
63 snapshotInputStream = snapshotBytes.openStream();
65 this.snapshotBytes = snapshotBytes;
67 totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0));
69 LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks);
71 replyReceivedForOffset = -1;
72 chunkIndex = FIRST_CHUNK_INDEX;
75 int incrementOffset() {
77 // if prev chunk failed, we would want to sent the same chunk again
78 offset = offset + snapshotChunkSize;
83 int incrementChunkIndex() {
85 // if prev chunk failed, we would want to sent the same chunk again
86 chunkIndex = chunkIndex + 1;
91 void startChunkTimer() {
95 void resetChunkTimer() {
99 boolean isChunkTimedOut(final FiniteDuration timeout) {
100 return chunkTimer.elapsed(TimeUnit.SECONDS) > timeout.toSeconds();
103 int getChunkIndex() {
107 int getTotalChunks() {
111 boolean canSendNextChunk() {
112 // we only send a false if a chunk is sent but we have not received a reply yet
113 return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE
114 || replyReceivedForOffset == offset);
117 boolean isLastChunk(final int index) {
118 return totalChunks == index;
121 void markSendStatus(final boolean success) {
123 // if the chunk sent was successful
124 replyReceivedForOffset = offset;
126 lastChunkHashCode = nextChunkHashCode;
128 // if the chunk sent was failure
129 replyReceivedForOffset = offset;
134 byte[] getNextChunk() throws IOException {
135 if (replyStatus || currentChunk == null) {
136 int start = incrementOffset();
137 int size = snapshotChunkSize;
138 if (snapshotChunkSize > snapshotSize) {
139 size = (int) snapshotSize;
140 } else if (start + snapshotChunkSize > snapshotSize) {
141 size = (int) (snapshotSize - start);
144 currentChunk = new byte[size];
145 int numRead = snapshotInputStream.read(currentChunk);
146 if (numRead != size) {
147 throw new IOException(String.format(
148 "The # of bytes read from the input stream, %d,"
149 + "does not match the expected # %d", numRead, size));
152 nextChunkHashCode = Arrays.hashCode(currentChunk);
154 LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
155 snapshotSize, start, size, nextChunkHashCode);
162 * Reset should be called when the Follower needs to be sent the snapshot from the beginning.
170 replyReceivedForOffset = offset;
171 chunkIndex = FIRST_CHUNK_INDEX;
173 lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
176 snapshotInputStream = snapshotBytes.openStream();
177 } catch (IOException e) {
178 throw new RuntimeException(e);
183 public void close() {
185 snapshotBytes = null;
188 private void closeStream() {
189 if (snapshotInputStream != null) {
191 snapshotInputStream.close();
192 } catch (IOException e) {
193 LOG.warn("{}: Error closing snapshot stream", logName);
196 snapshotInputStream = null;
200 int getLastChunkHashCode() {
201 return lastChunkHashCode;