Fixup chunk offset movement on resend
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderInstallSnapshotState.java
1 /*
2  * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import com.google.common.base.MoreObjects;
11 import com.google.common.base.Stopwatch;
12 import com.google.common.io.ByteSource;
13 import java.io.IOException;
14 import java.io.InputStream;
15 import java.util.Arrays;
16 import java.util.concurrent.TimeUnit;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19 import scala.concurrent.duration.FiniteDuration;
20
21 /**
22  * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
23  */
24 public final class LeaderInstallSnapshotState implements AutoCloseable {
25     private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
26
27     // The index of the first chunk that is sent when installing a snapshot
28     static final int FIRST_CHUNK_INDEX = 1;
29
30     // The index that the follower should respond with if it needs the install snapshot to be reset
31     static final int INVALID_CHUNK_INDEX = -1;
32
33     static final int INITIAL_OFFSET = -1;
34
35     // This would be passed as the hash code of the last chunk when sending the first chunk
36     static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
37
38     private final int snapshotChunkSize;
39     private final String logName;
40     private ByteSource snapshotBytes;
41     private int offset = INITIAL_OFFSET;
42     // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
43     private int replyReceivedForOffset = -1;
44     // if replyStatus is false, the previous chunk is attempted
45     private boolean replyStatus = false;
46     private int chunkIndex = FIRST_CHUNK_INDEX;
47     private int totalChunks;
48     private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
49     private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
50     private long snapshotSize;
51     private InputStream snapshotInputStream;
52     private Stopwatch chunkTimer = Stopwatch.createUnstarted();
53     private byte[] currentChunk = null;
54
55     LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) {
56         this.snapshotChunkSize = snapshotChunkSize;
57         this.logName = logName;
58     }
59
60     void setSnapshotBytes(final ByteSource snapshotBytes) throws IOException {
61         if (this.snapshotBytes != null) {
62             return;
63         }
64
65         snapshotSize = snapshotBytes.size();
66         snapshotInputStream = snapshotBytes.openStream();
67
68         this.snapshotBytes = snapshotBytes;
69
70         totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0));
71
72         LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks);
73
74         replyReceivedForOffset = INITIAL_OFFSET;
75         chunkIndex = FIRST_CHUNK_INDEX;
76     }
77
78     int incrementOffset() {
79         // if offset is -1 doesnt matter whether it was the initial value or reset, move the offset to 0 to begin with
80         if (offset == INITIAL_OFFSET) {
81             offset = 0;
82         } else {
83             offset = offset + snapshotChunkSize;
84         }
85         return offset;
86     }
87
88     int incrementChunkIndex() {
89         if (replyStatus) {
90             // if prev chunk failed, we would want to send the same chunk again
91             chunkIndex =  chunkIndex + 1;
92         }
93         return chunkIndex;
94     }
95
96     void startChunkTimer() {
97         chunkTimer.start();
98     }
99
100     void resetChunkTimer() {
101         chunkTimer.reset();
102     }
103
104     boolean isChunkTimedOut(final FiniteDuration timeout) {
105         return chunkTimer.elapsed(TimeUnit.SECONDS) > timeout.toSeconds();
106     }
107
108     int getChunkIndex() {
109         return chunkIndex;
110     }
111
112     int getTotalChunks() {
113         return totalChunks;
114     }
115
116     boolean canSendNextChunk() {
117         // we only send a false if a chunk is sent but we have not received a reply yet
118         return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE
119                 || replyReceivedForOffset == offset);
120     }
121
122     boolean isLastChunk(final int index) {
123         return totalChunks == index;
124     }
125
126     void markSendStatus(final boolean success) {
127         if (success) {
128             // if the chunk sent was successful
129             replyReceivedForOffset = offset;
130             replyStatus = true;
131             lastChunkHashCode = nextChunkHashCode;
132         } else {
133             // if the chunk sent was failure, revert offset to previous so we can retry
134             offset = replyReceivedForOffset;
135             replyStatus = false;
136         }
137     }
138
139     byte[] getNextChunk() throws IOException {
140         // increment offset to indicate next chunk is in flight, canSendNextChunk() wont let us hit this again until,
141         // markSendStatus() is called with either success or failure
142         int start = incrementOffset();
143         if (replyStatus || currentChunk == null) {
144             int size = snapshotChunkSize;
145             if (snapshotChunkSize > snapshotSize) {
146                 size = (int) snapshotSize;
147             } else if (start + snapshotChunkSize > snapshotSize) {
148                 size = (int) (snapshotSize - start);
149             }
150
151             currentChunk = new byte[size];
152             int numRead = snapshotInputStream.read(currentChunk);
153             if (numRead != size) {
154                 throw new IOException(String.format(
155                         "The # of bytes read from the input stream, %d,"
156                                 + "does not match the expected # %d", numRead, size));
157             }
158
159             nextChunkHashCode = Arrays.hashCode(currentChunk);
160
161             LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
162                     snapshotSize, start, size, nextChunkHashCode);
163         }
164
165         return currentChunk;
166     }
167
168     /**
169      * Reset should be called when the Follower needs to be sent the snapshot from the beginning.
170      */
171     void reset() {
172         closeStream();
173         chunkTimer.reset();
174
175         offset = INITIAL_OFFSET;
176         replyStatus = false;
177         replyReceivedForOffset = INITIAL_OFFSET;
178         chunkIndex = FIRST_CHUNK_INDEX;
179         currentChunk = null;
180         lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
181         nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
182
183         try {
184             snapshotInputStream = snapshotBytes.openStream();
185         } catch (IOException e) {
186             throw new RuntimeException(e);
187         }
188     }
189
190     @Override
191     public void close() {
192         closeStream();
193         snapshotBytes = null;
194     }
195
196     private void closeStream() {
197         if (snapshotInputStream != null) {
198             try {
199                 snapshotInputStream.close();
200             } catch (IOException e) {
201                 LOG.warn("{}: Error closing snapshot stream", logName);
202             }
203
204             snapshotInputStream = null;
205         }
206     }
207
208     int getLastChunkHashCode() {
209         return lastChunkHashCode;
210     }
211
212     @Override
213     public String toString() {
214         return MoreObjects.toStringHelper(this)
215                 .add("snapshotChunkSize", snapshotChunkSize)
216                 .add("offset", offset)
217                 .add("replyReceivedForOffset", replyReceivedForOffset)
218                 .add("replyStatus", replyStatus)
219                 .add("chunkIndex", chunkIndex)
220                 .add("totalChunks", totalChunks)
221                 .add("lastChunkHashCode", lastChunkHashCode)
222                 .add("nextChunkHashCode", nextChunkHashCode)
223                 .add("snapshotSize", snapshotSize)
224                 .add("chunkTimer", chunkTimer)
225                 .toString();
226     }
227 }