23a0f6d027c53841644a6ab1c5273708a9e0a011
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderInstallSnapshotState.java
1 /*
2  * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import com.google.common.base.Stopwatch;
11 import com.google.common.io.ByteSource;
12 import java.io.IOException;
13 import java.io.InputStream;
14 import java.util.Arrays;
15 import java.util.concurrent.TimeUnit;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18 import scala.concurrent.duration.FiniteDuration;
19
20 /**
21  * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
22  */
23 public final class LeaderInstallSnapshotState implements AutoCloseable {
24     private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
25
26     // The index of the first chunk that is sent when installing a snapshot
27     static final int FIRST_CHUNK_INDEX = 1;
28
29     // The index that the follower should respond with if it needs the install snapshot to be reset
30     static final int INVALID_CHUNK_INDEX = -1;
31
32     // This would be passed as the hash code of the last chunk when sending the first chunk
33     static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
34
35     private final int snapshotChunkSize;
36     private final String logName;
37     private ByteSource snapshotBytes;
38     private int offset = 0;
39     // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
40     private int replyReceivedForOffset = -1;
41     // if replyStatus is false, the previous chunk is attempted
42     private boolean replyStatus = false;
43     private int chunkIndex = FIRST_CHUNK_INDEX;
44     private int totalChunks;
45     private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
46     private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
47     private long snapshotSize;
48     private InputStream snapshotInputStream;
49     private Stopwatch chunkTimer = Stopwatch.createUnstarted();
50     private byte[] currentChunk = null;
51
52     LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) {
53         this.snapshotChunkSize = snapshotChunkSize;
54         this.logName = logName;
55     }
56
57     void setSnapshotBytes(final ByteSource snapshotBytes) throws IOException {
58         if (this.snapshotBytes != null) {
59             return;
60         }
61
62         snapshotSize = snapshotBytes.size();
63         snapshotInputStream = snapshotBytes.openStream();
64
65         this.snapshotBytes = snapshotBytes;
66
67         totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0));
68
69         LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks);
70
71         replyReceivedForOffset = -1;
72         chunkIndex = FIRST_CHUNK_INDEX;
73     }
74
75     int incrementOffset() {
76         if (replyStatus) {
77             // if prev chunk failed, we would want to sent the same chunk again
78             offset = offset + snapshotChunkSize;
79         }
80         return offset;
81     }
82
83     int incrementChunkIndex() {
84         if (replyStatus) {
85             // if prev chunk failed, we would want to sent the same chunk again
86             chunkIndex =  chunkIndex + 1;
87         }
88         return chunkIndex;
89     }
90
91     void startChunkTimer() {
92         chunkTimer.start();
93     }
94
95     void resetChunkTimer() {
96         chunkTimer.reset();
97     }
98
99     boolean isChunkTimedOut(final FiniteDuration timeout) {
100         return chunkTimer.elapsed(TimeUnit.SECONDS) > timeout.toSeconds();
101     }
102
103     int getChunkIndex() {
104         return chunkIndex;
105     }
106
107     int getTotalChunks() {
108         return totalChunks;
109     }
110
111     boolean canSendNextChunk() {
112         // we only send a false if a chunk is sent but we have not received a reply yet
113         return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE
114                 || replyReceivedForOffset == offset);
115     }
116
117     boolean isLastChunk(final int index) {
118         return totalChunks == index;
119     }
120
121     void markSendStatus(final boolean success) {
122         if (success) {
123             // if the chunk sent was successful
124             replyReceivedForOffset = offset;
125             replyStatus = true;
126             lastChunkHashCode = nextChunkHashCode;
127         } else {
128             // if the chunk sent was failure
129             replyReceivedForOffset = offset;
130             replyStatus = false;
131         }
132     }
133
134     byte[] getNextChunk() throws IOException {
135         if (replyStatus || currentChunk == null) {
136             int start = incrementOffset();
137             int size = snapshotChunkSize;
138             if (snapshotChunkSize > snapshotSize) {
139                 size = (int) snapshotSize;
140             } else if (start + snapshotChunkSize > snapshotSize) {
141                 size = (int) (snapshotSize - start);
142             }
143
144             currentChunk = new byte[size];
145             int numRead = snapshotInputStream.read(currentChunk);
146             if (numRead != size) {
147                 throw new IOException(String.format(
148                         "The # of bytes read from the input stream, %d,"
149                                 + "does not match the expected # %d", numRead, size));
150             }
151
152             nextChunkHashCode = Arrays.hashCode(currentChunk);
153
154             LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
155                     snapshotSize, start, size, nextChunkHashCode);
156         }
157
158         return currentChunk;
159     }
160
161     /**
162      * Reset should be called when the Follower needs to be sent the snapshot from the beginning.
163      */
164     void reset() {
165         closeStream();
166         chunkTimer.reset();
167
168         offset = 0;
169         replyStatus = false;
170         replyReceivedForOffset = offset;
171         chunkIndex = FIRST_CHUNK_INDEX;
172         currentChunk = null;
173         lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
174
175         try {
176             snapshotInputStream = snapshotBytes.openStream();
177         } catch (IOException e) {
178             throw new RuntimeException(e);
179         }
180     }
181
182     @Override
183     public void close() {
184         closeStream();
185         snapshotBytes = null;
186     }
187
188     private void closeStream() {
189         if (snapshotInputStream != null) {
190             try {
191                 snapshotInputStream.close();
192             } catch (IOException e) {
193                 LOG.warn("{}: Error closing snapshot stream", logName);
194             }
195
196             snapshotInputStream = null;
197         }
198     }
199
200     int getLastChunkHashCode() {
201         return lastChunkHashCode;
202     }
203 }