2 * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import com.google.common.io.ByteSource;
11 import java.io.IOException;
12 import java.io.InputStream;
13 import java.util.Arrays;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
18 * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
20 public final class LeaderInstallSnapshotState implements AutoCloseable {
21 private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
23 // The index of the first chunk that is sent when installing a snapshot
24 static final int FIRST_CHUNK_INDEX = 1;
26 // The index that the follower should respond with if it needs the install snapshot to be reset
27 static final int INVALID_CHUNK_INDEX = -1;
29 // This would be passed as the hash code of the last chunk when sending the first chunk
30 static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
32 private final int snapshotChunkSize;
33 private final String logName;
34 private ByteSource snapshotBytes;
35 private int offset = 0;
36 // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
37 private int replyReceivedForOffset = -1;
38 // if replyStatus is false, the previous chunk is attempted
39 private boolean replyStatus = false;
40 private int chunkIndex = FIRST_CHUNK_INDEX;
41 private int totalChunks;
42 private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
43 private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
44 private long snapshotSize;
45 private InputStream snapshotInputStream;
47 LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) {
48 this.snapshotChunkSize = snapshotChunkSize;
49 this.logName = logName;
52 void setSnapshotBytes(final ByteSource snapshotBytes) throws IOException {
53 if (this.snapshotBytes != null) {
57 snapshotSize = snapshotBytes.size();
58 snapshotInputStream = snapshotBytes.openStream();
60 this.snapshotBytes = snapshotBytes;
62 totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0));
64 LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks);
66 replyReceivedForOffset = -1;
67 chunkIndex = FIRST_CHUNK_INDEX;
70 int incrementOffset() {
72 // if prev chunk failed, we would want to sent the same chunk again
73 offset = offset + snapshotChunkSize;
78 int incrementChunkIndex() {
80 // if prev chunk failed, we would want to sent the same chunk again
81 chunkIndex = chunkIndex + 1;
90 int getTotalChunks() {
94 boolean canSendNextChunk() {
95 // we only send a false if a chunk is sent but we have not received a reply yet
96 return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE
97 || replyReceivedForOffset == offset);
100 boolean isLastChunk(final int index) {
101 return totalChunks == index;
104 void markSendStatus(final boolean success) {
106 // if the chunk sent was successful
107 replyReceivedForOffset = offset;
109 lastChunkHashCode = nextChunkHashCode;
111 // if the chunk sent was failure
112 replyReceivedForOffset = offset;
117 byte[] getNextChunk() throws IOException {
118 int start = incrementOffset();
119 int size = snapshotChunkSize;
120 if (snapshotChunkSize > snapshotSize) {
121 size = (int) snapshotSize;
122 } else if (start + snapshotChunkSize > snapshotSize) {
123 size = (int) (snapshotSize - start);
126 byte[] nextChunk = new byte[size];
127 int numRead = snapshotInputStream.read(nextChunk);
128 if (numRead != size) {
129 throw new IOException(String.format(
130 "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size));
133 nextChunkHashCode = Arrays.hashCode(nextChunk);
135 LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
136 snapshotSize, start, size, nextChunkHashCode);
141 * Reset should be called when the Follower needs to be sent the snapshot from the beginning.
148 replyReceivedForOffset = offset;
149 chunkIndex = FIRST_CHUNK_INDEX;
150 lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
153 snapshotInputStream = snapshotBytes.openStream();
154 } catch (IOException e) {
155 throw new RuntimeException(e);
160 public void close() {
162 snapshotBytes = null;
165 private void closeStream() {
166 if (snapshotInputStream != null) {
168 snapshotInputStream.close();
169 } catch (IOException e) {
170 LOG.warn("{}: Error closing snapshot stream", logName);
173 snapshotInputStream = null;
177 int getLastChunkHashCode() {
178 return lastChunkHashCode;