Reset snapshot progress on IOExceptions
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderInstallSnapshotState.java
1 /*
2  * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import com.google.common.base.MoreObjects;
11 import com.google.common.base.Stopwatch;
12 import com.google.common.io.ByteSource;
13 import java.io.IOException;
14 import java.io.InputStream;
15 import java.util.Arrays;
16 import java.util.concurrent.TimeUnit;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19 import scala.concurrent.duration.FiniteDuration;
20
21 /**
22  * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
23  */
24 public final class LeaderInstallSnapshotState implements AutoCloseable {
25     private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
26
27     // The index of the first chunk that is sent when installing a snapshot
28     static final int FIRST_CHUNK_INDEX = 1;
29
30     // The index that the follower should respond with if it needs the install snapshot to be reset
31     static final int INVALID_CHUNK_INDEX = -1;
32
33     // This would be passed as the hash code of the last chunk when sending the first chunk
34     static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
35
36     private final int snapshotChunkSize;
37     private final String logName;
38     private ByteSource snapshotBytes;
39     private int offset = 0;
40     // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
41     private int replyReceivedForOffset = -1;
42     // if replyStatus is false, the previous chunk is attempted
43     private boolean replyStatus = false;
44     private int chunkIndex = FIRST_CHUNK_INDEX;
45     private int totalChunks;
46     private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
47     private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
48     private long snapshotSize;
49     private InputStream snapshotInputStream;
50     private Stopwatch chunkTimer = Stopwatch.createUnstarted();
51     private byte[] currentChunk = null;
52
53     LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) {
54         this.snapshotChunkSize = snapshotChunkSize;
55         this.logName = logName;
56     }
57
58     void setSnapshotBytes(final ByteSource snapshotBytes) throws IOException {
59         if (this.snapshotBytes != null) {
60             return;
61         }
62
63         snapshotSize = snapshotBytes.size();
64         snapshotInputStream = snapshotBytes.openStream();
65
66         this.snapshotBytes = snapshotBytes;
67
68         totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0));
69
70         LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks);
71
72         replyReceivedForOffset = -1;
73         chunkIndex = FIRST_CHUNK_INDEX;
74     }
75
76     int incrementOffset() {
77         if (replyStatus) {
78             // if prev chunk failed, we would want to sent the same chunk again
79             offset = offset + snapshotChunkSize;
80         }
81         return offset;
82     }
83
84     int incrementChunkIndex() {
85         if (replyStatus) {
86             // if prev chunk failed, we would want to sent the same chunk again
87             chunkIndex =  chunkIndex + 1;
88         }
89         return chunkIndex;
90     }
91
92     void startChunkTimer() {
93         chunkTimer.start();
94     }
95
96     void resetChunkTimer() {
97         chunkTimer.reset();
98     }
99
100     boolean isChunkTimedOut(final FiniteDuration timeout) {
101         return chunkTimer.elapsed(TimeUnit.SECONDS) > timeout.toSeconds();
102     }
103
104     int getChunkIndex() {
105         return chunkIndex;
106     }
107
108     int getTotalChunks() {
109         return totalChunks;
110     }
111
112     boolean canSendNextChunk() {
113         // we only send a false if a chunk is sent but we have not received a reply yet
114         return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE
115                 || replyReceivedForOffset == offset);
116     }
117
118     boolean isLastChunk(final int index) {
119         return totalChunks == index;
120     }
121
122     void markSendStatus(final boolean success) {
123         if (success) {
124             // if the chunk sent was successful
125             replyReceivedForOffset = offset;
126             replyStatus = true;
127             lastChunkHashCode = nextChunkHashCode;
128         } else {
129             // if the chunk sent was failure
130             replyReceivedForOffset = offset;
131             replyStatus = false;
132         }
133     }
134
135     byte[] getNextChunk() throws IOException {
136         if (replyStatus || currentChunk == null) {
137             int start = incrementOffset();
138             int size = snapshotChunkSize;
139             if (snapshotChunkSize > snapshotSize) {
140                 size = (int) snapshotSize;
141             } else if (start + snapshotChunkSize > snapshotSize) {
142                 size = (int) (snapshotSize - start);
143             }
144
145             currentChunk = new byte[size];
146             int numRead = snapshotInputStream.read(currentChunk);
147             if (numRead != size) {
148                 throw new IOException(String.format(
149                         "The # of bytes read from the input stream, %d,"
150                                 + "does not match the expected # %d", numRead, size));
151             }
152
153             nextChunkHashCode = Arrays.hashCode(currentChunk);
154
155             LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
156                     snapshotSize, start, size, nextChunkHashCode);
157         }
158
159         return currentChunk;
160     }
161
162     /**
163      * Reset should be called when the Follower needs to be sent the snapshot from the beginning.
164      */
165     void reset() {
166         closeStream();
167         chunkTimer.reset();
168
169         offset = 0;
170         replyStatus = false;
171         replyReceivedForOffset = offset;
172         chunkIndex = FIRST_CHUNK_INDEX;
173         currentChunk = null;
174         lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
175
176         try {
177             snapshotInputStream = snapshotBytes.openStream();
178         } catch (IOException e) {
179             throw new RuntimeException(e);
180         }
181     }
182
183     @Override
184     public void close() {
185         closeStream();
186         snapshotBytes = null;
187     }
188
189     private void closeStream() {
190         if (snapshotInputStream != null) {
191             try {
192                 snapshotInputStream.close();
193             } catch (IOException e) {
194                 LOG.warn("{}: Error closing snapshot stream", logName);
195             }
196
197             snapshotInputStream = null;
198         }
199     }
200
201     int getLastChunkHashCode() {
202         return lastChunkHashCode;
203     }
204
205     @Override
206     public String toString() {
207         return MoreObjects.toStringHelper(this)
208                 .add("snapshotChunkSize", snapshotChunkSize)
209                 .add("offset", offset)
210                 .add("replyReceivedForOffset", replyReceivedForOffset)
211                 .add("replyStatus", replyStatus)
212                 .add("chunkIndex", chunkIndex)
213                 .add("totalChunks", totalChunks)
214                 .add("lastChunkHashCode", lastChunkHashCode)
215                 .add("nextChunkHashCode", nextChunkHashCode)
216                 .add("snapshotSize", snapshotSize)
217                 .add("chunkTimer", chunkTimer)
218                 .toString();
219     }
220 }