2 * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import com.google.common.base.Throwables;
11 import com.google.common.io.ByteSource;
12 import java.io.IOException;
13 import java.io.InputStream;
14 import java.util.Arrays;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
19 * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
21 public final class LeaderInstallSnapshotState implements AutoCloseable {
22 private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
24 // The index of the first chunk that is sent when installing a snapshot
25 static final int FIRST_CHUNK_INDEX = 1;
27 // The index that the follower should respond with if it needs the install snapshot to be reset
28 static final int INVALID_CHUNK_INDEX = -1;
30 // This would be passed as the hash code of the last chunk when sending the first chunk
31 static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
33 private final int snapshotChunkSize;
34 private final String logName;
35 private ByteSource snapshotBytes;
36 private int offset = 0;
37 // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
38 private int replyReceivedForOffset = -1;
39 // if replyStatus is false, the previous chunk is attempted
40 private boolean replyStatus = false;
41 private int chunkIndex = FIRST_CHUNK_INDEX;
42 private int totalChunks;
43 private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
44 private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
45 private long snapshotSize;
46 private InputStream snapshotInputStream;
48 LeaderInstallSnapshotState(int snapshotChunkSize, String logName) {
49 this.snapshotChunkSize = snapshotChunkSize;
50 this.logName = logName;
53 void setSnapshotBytes(ByteSource snapshotBytes) throws IOException {
54 if (this.snapshotBytes != null) {
58 snapshotSize = snapshotBytes.size();
59 snapshotInputStream = snapshotBytes.openStream();
61 this.snapshotBytes = snapshotBytes;
63 totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0));
65 LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks);
67 replyReceivedForOffset = -1;
68 chunkIndex = FIRST_CHUNK_INDEX;
71 int incrementOffset() {
73 // if prev chunk failed, we would want to sent the same chunk again
74 offset = offset + snapshotChunkSize;
79 int incrementChunkIndex() {
81 // if prev chunk failed, we would want to sent the same chunk again
82 chunkIndex = chunkIndex + 1;
91 int getTotalChunks() {
95 boolean canSendNextChunk() {
96 // we only send a false if a chunk is sent but we have not received a reply yet
97 return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE
98 || replyReceivedForOffset == offset);
101 boolean isLastChunk(int index) {
102 return totalChunks == index;
105 void markSendStatus(boolean success) {
107 // if the chunk sent was successful
108 replyReceivedForOffset = offset;
110 lastChunkHashCode = nextChunkHashCode;
112 // if the chunk sent was failure
113 replyReceivedForOffset = offset;
118 byte[] getNextChunk() throws IOException {
119 int start = incrementOffset();
120 int size = snapshotChunkSize;
121 if (snapshotChunkSize > snapshotSize) {
122 size = (int) snapshotSize;
123 } else if (start + snapshotChunkSize > snapshotSize) {
124 size = (int) (snapshotSize - start);
127 byte[] nextChunk = new byte[size];
128 int numRead = snapshotInputStream.read(nextChunk);
129 if (numRead != size) {
130 throw new IOException(String.format(
131 "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size));
134 nextChunkHashCode = Arrays.hashCode(nextChunk);
136 LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
137 snapshotSize, start, size, nextChunkHashCode);
142 * Reset should be called when the Follower needs to be sent the snapshot from the beginning.
149 replyReceivedForOffset = offset;
150 chunkIndex = FIRST_CHUNK_INDEX;
151 lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
154 snapshotInputStream = snapshotBytes.openStream();
155 } catch (IOException e) {
156 throw Throwables.propagate(e);
161 public void close() {
163 snapshotBytes = null;
166 private void closeStream() {
167 if (snapshotInputStream != null) {
169 snapshotInputStream.close();
170 } catch (IOException e) {
171 LOG.warn("{}: Error closing snapshot stream", logName);
174 snapshotInputStream = null;
178 int getLastChunkHashCode() {
179 return lastChunkHashCode;