2 * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.raft.behaviors;
10 import com.google.common.base.Throwables;
11 import com.google.common.io.ByteSource;
12 import java.io.IOException;
13 import java.io.InputStream;
14 import java.util.Arrays;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
19 * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
21 public final class LeaderInstallSnapshotState implements AutoCloseable {
22 private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
24 // The index of the first chunk that is sent when installing a snapshot
25 static final int FIRST_CHUNK_INDEX = 1;
27 // The index that the follower should respond with if it needs the install snapshot to be reset
28 static final int INVALID_CHUNK_INDEX = -1;
30 // This would be passed as the hash code of the last chunk when sending the first chunk
31 static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
33 private final int snapshotChunkSize;
34 private final String logName;
35 private ByteSource snapshotBytes;
36 private int offset = 0;
37 // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
38 private int replyReceivedForOffset = -1;
39 // if replyStatus is false, the previous chunk is attempted
40 private boolean replyStatus = false;
41 private int chunkIndex = FIRST_CHUNK_INDEX;
42 private int totalChunks;
43 private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
44 private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
45 private long snapshotSize;
46 private InputStream snapshotInputStream;
48 LeaderInstallSnapshotState(int snapshotChunkSize, String logName) {
49 this.snapshotChunkSize = snapshotChunkSize;
50 this.logName = logName;
53 void setSnapshotBytes(ByteSource snapshotBytes) throws IOException {
54 if (this.snapshotBytes != null) {
58 snapshotSize = snapshotBytes.size();
59 snapshotInputStream = snapshotBytes.openStream();
61 this.snapshotBytes = snapshotBytes;
63 totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0));
65 LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks);
67 replyReceivedForOffset = -1;
68 chunkIndex = FIRST_CHUNK_INDEX;
71 int incrementOffset() {
73 // if prev chunk failed, we would want to sent the same chunk again
74 offset = offset + snapshotChunkSize;
79 int incrementChunkIndex() {
81 // if prev chunk failed, we would want to sent the same chunk again
82 chunkIndex = chunkIndex + 1;
91 int getTotalChunks() {
95 boolean canSendNextChunk() {
96 // we only send a false if a chunk is sent but we have not received a reply yet
97 return snapshotBytes != null && replyReceivedForOffset == offset;
100 boolean isLastChunk(int index) {
101 return totalChunks == index;
104 void markSendStatus(boolean success) {
106 // if the chunk sent was successful
107 replyReceivedForOffset = offset;
109 lastChunkHashCode = nextChunkHashCode;
111 // if the chunk sent was failure
112 replyReceivedForOffset = offset;
117 byte[] getNextChunk() throws IOException {
118 int start = incrementOffset();
119 int size = snapshotChunkSize;
120 if (snapshotChunkSize > snapshotSize) {
121 size = (int) snapshotSize;
122 } else if (start + snapshotChunkSize > snapshotSize) {
123 size = (int) (snapshotSize - start);
126 byte[] nextChunk = new byte[size];
127 int numRead = snapshotInputStream.read(nextChunk);
128 if (numRead != size) {
129 throw new IOException(String.format(
130 "The # of bytes read from the imput stream, %d, does not match the expected # %d", numRead, size));
133 nextChunkHashCode = Arrays.hashCode(nextChunk);
135 LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
136 snapshotSize, start, size, nextChunkHashCode);
141 * Reset should be called when the Follower needs to be sent the snapshot from the beginning.
148 replyReceivedForOffset = offset;
149 chunkIndex = FIRST_CHUNK_INDEX;
150 lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
153 snapshotInputStream = snapshotBytes.openStream();
154 } catch (IOException e) {
155 throw Throwables.propagate(e);
160 public void close() {
162 snapshotBytes = null;
165 private void closeStream() {
166 if (snapshotInputStream != null) {
168 snapshotInputStream.close();
169 } catch (IOException e) {
170 LOG.warn("{}: Error closing snapshot stream", logName);
173 snapshotInputStream = null;
177 int getLastChunkHashCode() {
178 return lastChunkHashCode;