Bug 7521: Convert install snapshot chunking to use streams
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderInstallSnapshotState.java
1 /*
2  * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import com.google.common.base.Throwables;
11 import com.google.common.io.ByteSource;
12 import java.io.IOException;
13 import java.io.InputStream;
14 import java.util.Arrays;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17
18 /**
19  * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
20  */
21 public final class LeaderInstallSnapshotState implements AutoCloseable {
22     private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
23
24     // The index of the first chunk that is sent when installing a snapshot
25     static final int FIRST_CHUNK_INDEX = 1;
26
27     // The index that the follower should respond with if it needs the install snapshot to be reset
28     static final int INVALID_CHUNK_INDEX = -1;
29
30     // This would be passed as the hash code of the last chunk when sending the first chunk
31     static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
32
33     private final int snapshotChunkSize;
34     private final String logName;
35     private ByteSource snapshotBytes;
36     private int offset = 0;
37     // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
38     private int replyReceivedForOffset = -1;
39     // if replyStatus is false, the previous chunk is attempted
40     private boolean replyStatus = false;
41     private int chunkIndex = FIRST_CHUNK_INDEX;
42     private int totalChunks;
43     private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
44     private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
45     private long snapshotSize;
46     private InputStream snapshotInputStream;
47
48     LeaderInstallSnapshotState(int snapshotChunkSize, String logName) {
49         this.snapshotChunkSize = snapshotChunkSize;
50         this.logName = logName;
51     }
52
53     void setSnapshotBytes(ByteSource snapshotBytes) throws IOException {
54         if (this.snapshotBytes != null) {
55             return;
56         }
57
58         snapshotSize = snapshotBytes.size();
59         snapshotInputStream = snapshotBytes.openStream();
60
61         this.snapshotBytes = snapshotBytes;
62
63         totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0));
64
65         LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks);
66
67         replyReceivedForOffset = -1;
68         chunkIndex = FIRST_CHUNK_INDEX;
69     }
70
71     int incrementOffset() {
72         if (replyStatus) {
73             // if prev chunk failed, we would want to sent the same chunk again
74             offset = offset + snapshotChunkSize;
75         }
76         return offset;
77     }
78
79     int incrementChunkIndex() {
80         if (replyStatus) {
81             // if prev chunk failed, we would want to sent the same chunk again
82             chunkIndex =  chunkIndex + 1;
83         }
84         return chunkIndex;
85     }
86
87     int getChunkIndex() {
88         return chunkIndex;
89     }
90
91     int getTotalChunks() {
92         return totalChunks;
93     }
94
95     boolean canSendNextChunk() {
96         // we only send a false if a chunk is sent but we have not received a reply yet
97         return snapshotBytes != null && replyReceivedForOffset == offset;
98     }
99
100     boolean isLastChunk(int index) {
101         return totalChunks == index;
102     }
103
104     void markSendStatus(boolean success) {
105         if (success) {
106             // if the chunk sent was successful
107             replyReceivedForOffset = offset;
108             replyStatus = true;
109             lastChunkHashCode = nextChunkHashCode;
110         } else {
111             // if the chunk sent was failure
112             replyReceivedForOffset = offset;
113             replyStatus = false;
114         }
115     }
116
117     byte[] getNextChunk() throws IOException {
118         int start = incrementOffset();
119         int size = snapshotChunkSize;
120         if (snapshotChunkSize > snapshotSize) {
121             size = (int) snapshotSize;
122         } else if (start + snapshotChunkSize > snapshotSize) {
123             size = (int) (snapshotSize - start);
124         }
125
126         byte[] nextChunk = new byte[size];
127         int numRead = snapshotInputStream.read(nextChunk);
128         if (numRead != size) {
129             throw new IOException(String.format(
130                     "The # of bytes read from the imput stream, %d, does not match the expected # %d", numRead, size));
131         }
132
133         nextChunkHashCode = Arrays.hashCode(nextChunk);
134
135         LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
136                 snapshotSize, start, size, nextChunkHashCode);
137         return nextChunk;
138     }
139
140     /**
141      * Reset should be called when the Follower needs to be sent the snapshot from the beginning.
142      */
143     void reset() {
144         closeStream();
145
146         offset = 0;
147         replyStatus = false;
148         replyReceivedForOffset = offset;
149         chunkIndex = FIRST_CHUNK_INDEX;
150         lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
151
152         try {
153             snapshotInputStream = snapshotBytes.openStream();
154         } catch (IOException e) {
155             throw Throwables.propagate(e);
156         }
157     }
158
159     @Override
160     public void close() {
161         closeStream();
162         snapshotBytes = null;
163     }
164
165     private void closeStream() {
166         if (snapshotInputStream != null) {
167             try {
168                 snapshotInputStream.close();
169             } catch (IOException e) {
170                 LOG.warn("{}: Error closing snapshot stream", logName);
171             }
172
173             snapshotInputStream = null;
174         }
175     }
176
177     int getLastChunkHashCode() {
178         return lastChunkHashCode;
179     }
180 }