Failed chunks should move offset backwards
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderInstallSnapshotState.java
1 /*
2  * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import com.google.common.base.MoreObjects;
11 import com.google.common.base.Stopwatch;
12 import com.google.common.io.ByteSource;
13 import java.io.IOException;
14 import java.io.InputStream;
15 import java.util.Arrays;
16 import java.util.concurrent.TimeUnit;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19 import scala.concurrent.duration.FiniteDuration;
20
21 /**
22  * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
23  */
24 public final class LeaderInstallSnapshotState implements AutoCloseable {
25     private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
26
27     // The index of the first chunk that is sent when installing a snapshot
28     static final int FIRST_CHUNK_INDEX = 1;
29
30     // The index that the follower should respond with if it needs the install snapshot to be reset
31     static final int INVALID_CHUNK_INDEX = -1;
32
33     // This would be passed as the hash code of the last chunk when sending the first chunk
34     static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
35
36     private final int snapshotChunkSize;
37     private final String logName;
38     private ByteSource snapshotBytes;
39     private int offset = 0;
40     // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
41     private int replyReceivedForOffset = -1;
42     // if replyStatus is false, the previous chunk is attempted
43     private boolean replyStatus = false;
44     private int chunkIndex = FIRST_CHUNK_INDEX;
45     private int totalChunks;
46     private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
47     private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
48     private long snapshotSize;
49     private InputStream snapshotInputStream;
50     private Stopwatch chunkTimer = Stopwatch.createUnstarted();
51     private byte[] currentChunk = null;
52
53     LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) {
54         this.snapshotChunkSize = snapshotChunkSize;
55         this.logName = logName;
56     }
57
58     void setSnapshotBytes(final ByteSource snapshotBytes) throws IOException {
59         if (this.snapshotBytes != null) {
60             return;
61         }
62
63         snapshotSize = snapshotBytes.size();
64         snapshotInputStream = snapshotBytes.openStream();
65
66         this.snapshotBytes = snapshotBytes;
67
68         totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0));
69
70         LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks);
71
72         replyReceivedForOffset = -1;
73         chunkIndex = FIRST_CHUNK_INDEX;
74     }
75
76     int incrementOffset() {
77         // if first chunk was retried, reset offset back to initial 0
78         if (offset == -1) {
79             offset = 0;
80         }
81         if (replyStatus) {
82             // if prev chunk failed, we would want to send the same chunk again
83             offset = offset + snapshotChunkSize;
84         }
85         return offset;
86     }
87
88     int incrementChunkIndex() {
89         if (replyStatus) {
90             // if prev chunk failed, we would want to send the same chunk again
91             chunkIndex =  chunkIndex + 1;
92         }
93         return chunkIndex;
94     }
95
96     void startChunkTimer() {
97         chunkTimer.start();
98     }
99
100     void resetChunkTimer() {
101         chunkTimer.reset();
102     }
103
104     boolean isChunkTimedOut(final FiniteDuration timeout) {
105         return chunkTimer.elapsed(TimeUnit.SECONDS) > timeout.toSeconds();
106     }
107
108     int getChunkIndex() {
109         return chunkIndex;
110     }
111
112     int getTotalChunks() {
113         return totalChunks;
114     }
115
116     boolean canSendNextChunk() {
117         // we only send a false if a chunk is sent but we have not received a reply yet
118         return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE
119                 || replyReceivedForOffset == offset);
120     }
121
122     boolean isLastChunk(final int index) {
123         return totalChunks == index;
124     }
125
126     void markSendStatus(final boolean success) {
127         if (success) {
128             // if the chunk sent was successful
129             replyReceivedForOffset = offset;
130             replyStatus = true;
131             lastChunkHashCode = nextChunkHashCode;
132         } else {
133             // if the chunk sent was failure, revert offset to previous so we can retry
134             offset = replyReceivedForOffset;
135             replyStatus = false;
136         }
137     }
138
139     byte[] getNextChunk() throws IOException {
140         // increment offset to indicate next chunk is in flight, canSendNextChunk() wont let us hit this again until,
141         // markSendStatus() is called with either success or failure
142         int start = incrementOffset();
143         if (replyStatus || currentChunk == null) {
144             int size = snapshotChunkSize;
145             if (snapshotChunkSize > snapshotSize) {
146                 size = (int) snapshotSize;
147             } else if (start + snapshotChunkSize > snapshotSize) {
148                 size = (int) (snapshotSize - start);
149             }
150
151             currentChunk = new byte[size];
152             int numRead = snapshotInputStream.read(currentChunk);
153             if (numRead != size) {
154                 throw new IOException(String.format(
155                         "The # of bytes read from the input stream, %d,"
156                                 + "does not match the expected # %d", numRead, size));
157             }
158
159             nextChunkHashCode = Arrays.hashCode(currentChunk);
160
161             LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
162                     snapshotSize, start, size, nextChunkHashCode);
163         }
164
165         return currentChunk;
166     }
167
168     /**
169      * Reset should be called when the Follower needs to be sent the snapshot from the beginning.
170      */
171     void reset() {
172         closeStream();
173         chunkTimer.reset();
174
175         offset = 0;
176         replyStatus = false;
177         replyReceivedForOffset = offset;
178         chunkIndex = FIRST_CHUNK_INDEX;
179         currentChunk = null;
180         lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
181
182         try {
183             snapshotInputStream = snapshotBytes.openStream();
184         } catch (IOException e) {
185             throw new RuntimeException(e);
186         }
187     }
188
189     @Override
190     public void close() {
191         closeStream();
192         snapshotBytes = null;
193     }
194
195     private void closeStream() {
196         if (snapshotInputStream != null) {
197             try {
198                 snapshotInputStream.close();
199             } catch (IOException e) {
200                 LOG.warn("{}: Error closing snapshot stream", logName);
201             }
202
203             snapshotInputStream = null;
204         }
205     }
206
207     int getLastChunkHashCode() {
208         return lastChunkHashCode;
209     }
210
211     @Override
212     public String toString() {
213         return MoreObjects.toStringHelper(this)
214                 .add("snapshotChunkSize", snapshotChunkSize)
215                 .add("offset", offset)
216                 .add("replyReceivedForOffset", replyReceivedForOffset)
217                 .add("replyStatus", replyStatus)
218                 .add("chunkIndex", chunkIndex)
219                 .add("totalChunks", totalChunks)
220                 .add("lastChunkHashCode", lastChunkHashCode)
221                 .add("nextChunkHashCode", nextChunkHashCode)
222                 .add("snapshotSize", snapshotSize)
223                 .add("chunkTimer", chunkTimer)
224                 .toString();
225     }
226 }