Bug 7449: Add message slicing/re-assembly classes
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / SlicedMessageState.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import com.google.common.io.ByteSource;
11 import java.io.IOException;
12 import java.io.InputStream;
13 import java.util.Arrays;
14 import java.util.function.Consumer;
15 import javax.annotation.concurrent.NotThreadSafe;
16 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
17 import org.opendaylight.yangtools.concepts.Identifier;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 /**
22  * Maintains the state of a sliced message.
23  *
24  * @author Thomas Pantelis
25  * @see MessageSlicer
26  */
27 @NotThreadSafe
28 public class SlicedMessageState<T> implements AutoCloseable {
29     private static final Logger LOG = LoggerFactory.getLogger(SlicedMessageState.class);
30
31     // The index of the first slice that is sent.
32     static final int FIRST_SLICE_INDEX = 1;
33
34     // The initial hash code for a slice.
35     static final int INITIAL_SLICE_HASH_CODE = -1;
36
37     private final Identifier identifier;
38     private final int messageSliceSize;
39     private final FileBackedOutputStream fileBackedStream;
40     private final T replyTarget;
41     private final ByteSource messageBytes;
42     private final int totalSlices;
43     private final long totalMessageSize;
44     private final int maxRetries;
45     private final Consumer<Throwable> onFailureCallback;
46     private final String logContext;
47
48     private int currentByteOffset = 0;
49     private int currentSliceIndex = FIRST_SLICE_INDEX - 1;
50     private int lastSliceHashCode = INITIAL_SLICE_HASH_CODE;
51     private int currentSliceHashCode = INITIAL_SLICE_HASH_CODE;
52     private int tryCount = 1;
53     private InputStream messageInputStream;
54
55     /**
56      * Constructor.
57      *
58      * @param identifier the identifier for this instance
59      * @param fileBackedStream the FileBackedOutputStream containing the serialized data to slice
60      * @param messageSliceSize the maximum size (in bytes) for a message slice
61      * @param maxRetries the maximum number of retries
62      * @param replyTarget the user-defined target for sliced message replies
63      * @param onFailureCallback the callback to notify on failure
64      * @param logContext the context for log messages
65      * @throws IOException if an error occurs opening the input stream
66      */
67     public SlicedMessageState(final Identifier identifier, final FileBackedOutputStream fileBackedStream,
68             final int messageSliceSize, final int maxRetries, final T replyTarget,
69             final Consumer<Throwable> onFailureCallback, final String logContext) throws IOException {
70         this.identifier = identifier;
71         this.fileBackedStream = fileBackedStream;
72         this.messageSliceSize = messageSliceSize;
73         this.maxRetries = maxRetries;
74         this.replyTarget = replyTarget;
75         this.onFailureCallback = onFailureCallback;
76         this.logContext = logContext;
77
78         messageBytes = fileBackedStream.asByteSource();
79         totalMessageSize = messageBytes.size();
80         messageInputStream = messageBytes.openStream();
81
82         totalSlices = (int)(totalMessageSize / messageSliceSize + (totalMessageSize % messageSliceSize > 0 ? 1 : 0));
83
84         LOG.debug("{}: Message size: {} bytes, total slices to send: {}", logContext, totalMessageSize, totalSlices);
85     }
86
87     /**
88      * Returns the current slice index that has been sent.
89      *
90      * @return the current slice index that has been sent
91      */
92     public int getCurrentSliceIndex() {
93         return currentSliceIndex;
94     }
95
96     /**
97      * Returns the hash code of the last slice that was sent.
98      *
99      * @return the hash code of the last slice that was sent
100      */
101     public int getLastSliceHashCode() {
102         return lastSliceHashCode;
103     }
104
105     /**
106      * Returns the total number of slices to send.
107      *
108      * @return the total number of slices to send
109      */
110     public int getTotalSlices() {
111         return totalSlices;
112     }
113
114     /**
115      * Returns the identifier of this instance.
116      *
117      * @return the identifier
118      */
119     public Identifier getIdentifier() {
120         return identifier;
121     }
122
123     /**
124      * Returns the user-defined target for sliced message replies.
125      *
126      * @return the user-defined target
127      */
128     public T getReplyTarget() {
129         return replyTarget;
130     }
131
132     /**
133      *  Returns the callback to notify on failure.
134      *
135      * @return the callback to notify on failure
136      */
137     public Consumer<Throwable> getOnFailureCallback() {
138         return onFailureCallback;
139     }
140
141     /**
142      * Determines if the slicing can be retried.
143      *
144      * @return true if the slicing can be retried, false if the maximum number of retries has been reached
145      */
146     public boolean canRetry() {
147         return tryCount <= maxRetries;
148     }
149
150     /**
151      * Determines if the given index is the last slice to send.
152      *
153      * @param index the slice index to test
154      * @return true if the index is the last slice, false otherwise
155      */
156     public boolean isLastSlice(int index) {
157         return totalSlices == index;
158     }
159
160     /**
161      * Reads and returns the next slice of data.
162      *
163      * @return the next slice of data as a byte[]
164      * @throws IOException if an error occurs reading the data
165      */
166     public byte[] getNextSlice() throws IOException {
167         currentSliceIndex++;
168         final int start;
169         if (currentSliceIndex == FIRST_SLICE_INDEX) {
170             start = 0;
171         } else {
172             start = incrementByteOffset();
173         }
174
175         final int size;
176         if (messageSliceSize > totalMessageSize) {
177             size = (int) totalMessageSize;
178         } else if (start + messageSliceSize > totalMessageSize) {
179             size = (int) (totalMessageSize - start);
180         } else {
181             size = messageSliceSize;
182         }
183
184         LOG.debug("{}: getNextSlice: total size: {}, offset: {}, size: {}, index: {}", logContext, totalMessageSize,
185                 start, size, currentSliceIndex);
186
187         byte[] nextSlice = new byte[size];
188         int numRead = messageInputStream.read(nextSlice);
189         if (numRead != size) {
190             throw new IOException(String.format(
191                     "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size));
192         }
193
194         lastSliceHashCode = currentSliceHashCode;
195         currentSliceHashCode = Arrays.hashCode(nextSlice);
196
197         return nextSlice;
198     }
199
200     /**
201      * Resets this instance to restart slicing from the beginning.
202      *
203      * @throws IOException if an error occurs resetting the input stream
204      */
205     public void reset() throws IOException {
206         closeStream();
207
208         tryCount++;
209         currentByteOffset = 0;
210         currentSliceIndex = FIRST_SLICE_INDEX - 1;
211         lastSliceHashCode = INITIAL_SLICE_HASH_CODE;
212         currentSliceHashCode = INITIAL_SLICE_HASH_CODE;
213
214         messageInputStream = messageBytes.openStream();
215     }
216
217     private int incrementByteOffset() {
218         currentByteOffset  += messageSliceSize;
219         return currentByteOffset;
220     }
221
222     private void closeStream() {
223         if (messageInputStream != null) {
224             try {
225                 messageInputStream.close();
226             } catch (IOException e) {
227                 LOG.warn("{}: Error closing message stream", logContext, e);
228             }
229
230             messageInputStream = null;
231         }
232     }
233
234     @Override
235     public void close() {
236         closeStream();
237         fileBackedStream.cleanup();
238     }
239 }