3b4c7d813309a3ddb0b03520ebf1254623578339
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderInstallSnapshotState.java
1 /*
2  * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft.behaviors;
9
10 import com.google.common.base.Throwables;
11 import com.google.common.io.ByteSource;
12 import java.io.IOException;
13 import java.io.InputStream;
14 import java.util.Arrays;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17
18 /**
19  * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
20  */
21 public final class LeaderInstallSnapshotState implements AutoCloseable {
22     private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
23
24     // The index of the first chunk that is sent when installing a snapshot
25     static final int FIRST_CHUNK_INDEX = 1;
26
27     // The index that the follower should respond with if it needs the install snapshot to be reset
28     static final int INVALID_CHUNK_INDEX = -1;
29
30     // This would be passed as the hash code of the last chunk when sending the first chunk
31     static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
32
33     private final int snapshotChunkSize;
34     private final String logName;
35     private ByteSource snapshotBytes;
36     private int offset = 0;
37     // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
38     private int replyReceivedForOffset = -1;
39     // if replyStatus is false, the previous chunk is attempted
40     private boolean replyStatus = false;
41     private int chunkIndex = FIRST_CHUNK_INDEX;
42     private int totalChunks;
43     private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
44     private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
45     private long snapshotSize;
46     private InputStream snapshotInputStream;
47
48     LeaderInstallSnapshotState(int snapshotChunkSize, String logName) {
49         this.snapshotChunkSize = snapshotChunkSize;
50         this.logName = logName;
51     }
52
53     void setSnapshotBytes(ByteSource snapshotBytes) throws IOException {
54         if (this.snapshotBytes != null) {
55             return;
56         }
57
58         snapshotSize = snapshotBytes.size();
59         snapshotInputStream = snapshotBytes.openStream();
60
61         this.snapshotBytes = snapshotBytes;
62
63         totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0));
64
65         LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks);
66
67         replyReceivedForOffset = -1;
68         chunkIndex = FIRST_CHUNK_INDEX;
69     }
70
71     int incrementOffset() {
72         if (replyStatus) {
73             // if prev chunk failed, we would want to sent the same chunk again
74             offset = offset + snapshotChunkSize;
75         }
76         return offset;
77     }
78
79     int incrementChunkIndex() {
80         if (replyStatus) {
81             // if prev chunk failed, we would want to sent the same chunk again
82             chunkIndex =  chunkIndex + 1;
83         }
84         return chunkIndex;
85     }
86
87     int getChunkIndex() {
88         return chunkIndex;
89     }
90
91     int getTotalChunks() {
92         return totalChunks;
93     }
94
95     boolean canSendNextChunk() {
96         // we only send a false if a chunk is sent but we have not received a reply yet
97         return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE
98                 || replyReceivedForOffset == offset);
99     }
100
101     boolean isLastChunk(int index) {
102         return totalChunks == index;
103     }
104
105     void markSendStatus(boolean success) {
106         if (success) {
107             // if the chunk sent was successful
108             replyReceivedForOffset = offset;
109             replyStatus = true;
110             lastChunkHashCode = nextChunkHashCode;
111         } else {
112             // if the chunk sent was failure
113             replyReceivedForOffset = offset;
114             replyStatus = false;
115         }
116     }
117
118     byte[] getNextChunk() throws IOException {
119         int start = incrementOffset();
120         int size = snapshotChunkSize;
121         if (snapshotChunkSize > snapshotSize) {
122             size = (int) snapshotSize;
123         } else if (start + snapshotChunkSize > snapshotSize) {
124             size = (int) (snapshotSize - start);
125         }
126
127         byte[] nextChunk = new byte[size];
128         int numRead = snapshotInputStream.read(nextChunk);
129         if (numRead != size) {
130             throw new IOException(String.format(
131                     "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size));
132         }
133
134         nextChunkHashCode = Arrays.hashCode(nextChunk);
135
136         LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
137                 snapshotSize, start, size, nextChunkHashCode);
138         return nextChunk;
139     }
140
141     /**
142      * Reset should be called when the Follower needs to be sent the snapshot from the beginning.
143      */
144     void reset() {
145         closeStream();
146
147         offset = 0;
148         replyStatus = false;
149         replyReceivedForOffset = offset;
150         chunkIndex = FIRST_CHUNK_INDEX;
151         lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
152
153         try {
154             snapshotInputStream = snapshotBytes.openStream();
155         } catch (IOException e) {
156             throw Throwables.propagate(e);
157         }
158     }
159
160     @Override
161     public void close() {
162         closeStream();
163         snapshotBytes = null;
164     }
165
166     private void closeStream() {
167         if (snapshotInputStream != null) {
168             try {
169                 snapshotInputStream.close();
170             } catch (IOException e) {
171                 LOG.warn("{}: Error closing snapshot stream", logName);
172             }
173
174             snapshotInputStream = null;
175         }
176     }
177
178     int getLastChunkHashCode() {
179         return lastChunkHashCode;
180     }
181 }