Cleanup warnings
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderInstallSnapshotState.java
1 /*
2  * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import com.google.common.io.ByteSource;
11 import java.io.IOException;
12 import java.io.InputStream;
13 import java.util.Arrays;
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16
17 /**
18  * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
19  */
20 public final class LeaderInstallSnapshotState implements AutoCloseable {
21     private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
22
23     // The index of the first chunk that is sent when installing a snapshot
24     static final int FIRST_CHUNK_INDEX = 1;
25
26     // The index that the follower should respond with if it needs the install snapshot to be reset
27     static final int INVALID_CHUNK_INDEX = -1;
28
29     // This would be passed as the hash code of the last chunk when sending the first chunk
30     static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
31
32     private final int snapshotChunkSize;
33     private final String logName;
34     private ByteSource snapshotBytes;
35     private int offset = 0;
36     // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
37     private int replyReceivedForOffset = -1;
38     // if replyStatus is false, the previous chunk is attempted
39     private boolean replyStatus = false;
40     private int chunkIndex = FIRST_CHUNK_INDEX;
41     private int totalChunks;
42     private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
43     private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
44     private long snapshotSize;
45     private InputStream snapshotInputStream;
46
47     LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) {
48         this.snapshotChunkSize = snapshotChunkSize;
49         this.logName = logName;
50     }
51
52     void setSnapshotBytes(final ByteSource snapshotBytes) throws IOException {
53         if (this.snapshotBytes != null) {
54             return;
55         }
56
57         snapshotSize = snapshotBytes.size();
58         snapshotInputStream = snapshotBytes.openStream();
59
60         this.snapshotBytes = snapshotBytes;
61
62         totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0));
63
64         LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks);
65
66         replyReceivedForOffset = -1;
67         chunkIndex = FIRST_CHUNK_INDEX;
68     }
69
70     int incrementOffset() {
71         if (replyStatus) {
72             // if prev chunk failed, we would want to sent the same chunk again
73             offset = offset + snapshotChunkSize;
74         }
75         return offset;
76     }
77
78     int incrementChunkIndex() {
79         if (replyStatus) {
80             // if prev chunk failed, we would want to sent the same chunk again
81             chunkIndex =  chunkIndex + 1;
82         }
83         return chunkIndex;
84     }
85
86     int getChunkIndex() {
87         return chunkIndex;
88     }
89
90     int getTotalChunks() {
91         return totalChunks;
92     }
93
94     boolean canSendNextChunk() {
95         // we only send a false if a chunk is sent but we have not received a reply yet
96         return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE
97                 || replyReceivedForOffset == offset);
98     }
99
100     boolean isLastChunk(final int index) {
101         return totalChunks == index;
102     }
103
104     void markSendStatus(final boolean success) {
105         if (success) {
106             // if the chunk sent was successful
107             replyReceivedForOffset = offset;
108             replyStatus = true;
109             lastChunkHashCode = nextChunkHashCode;
110         } else {
111             // if the chunk sent was failure
112             replyReceivedForOffset = offset;
113             replyStatus = false;
114         }
115     }
116
117     byte[] getNextChunk() throws IOException {
118         int start = incrementOffset();
119         int size = snapshotChunkSize;
120         if (snapshotChunkSize > snapshotSize) {
121             size = (int) snapshotSize;
122         } else if (start + snapshotChunkSize > snapshotSize) {
123             size = (int) (snapshotSize - start);
124         }
125
126         byte[] nextChunk = new byte[size];
127         int numRead = snapshotInputStream.read(nextChunk);
128         if (numRead != size) {
129             throw new IOException(String.format(
130                     "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size));
131         }
132
133         nextChunkHashCode = Arrays.hashCode(nextChunk);
134
135         LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
136                 snapshotSize, start, size, nextChunkHashCode);
137         return nextChunk;
138     }
139
140     /**
141      * Reset should be called when the Follower needs to be sent the snapshot from the beginning.
142      */
143     void reset() {
144         closeStream();
145
146         offset = 0;
147         replyStatus = false;
148         replyReceivedForOffset = offset;
149         chunkIndex = FIRST_CHUNK_INDEX;
150         lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
151
152         try {
153             snapshotInputStream = snapshotBytes.openStream();
154         } catch (IOException e) {
155             throw new RuntimeException(e);
156         }
157     }
158
159     @Override
160     public void close() {
161         closeStream();
162         snapshotBytes = null;
163     }
164
165     private void closeStream() {
166         if (snapshotInputStream != null) {
167             try {
168                 snapshotInputStream.close();
169             } catch (IOException e) {
170                 LOG.warn("{}: Error closing snapshot stream", logName);
171             }
172
173             snapshotInputStream = null;
174         }
175     }
176
177     int getLastChunkHashCode() {
178         return lastChunkHashCode;
179     }
180 }