Rename ValueTypes to LithiumValue
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / SlicedMessageState.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import com.google.common.io.ByteSource;
11 import java.io.IOException;
12 import java.io.InputStream;
13 import java.util.Arrays;
14 import java.util.function.Consumer;
15 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
16 import org.opendaylight.yangtools.concepts.Identifier;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 /**
21  * Maintains the state of a sliced message. This class is NOT thread-safe.
22  *
23  * @author Thomas Pantelis
24  * @see MessageSlicer
25  */
26 public class SlicedMessageState<T> implements AutoCloseable {
27     private static final Logger LOG = LoggerFactory.getLogger(SlicedMessageState.class);
28
29     // The index of the first slice that is sent.
30     static final int FIRST_SLICE_INDEX = 1;
31
32     // The initial hash code for a slice.
33     static final int INITIAL_SLICE_HASH_CODE = -1;
34
35     private final Identifier identifier;
36     private final int messageSliceSize;
37     private final FileBackedOutputStream fileBackedStream;
38     private final T replyTarget;
39     private final ByteSource messageBytes;
40     private final int totalSlices;
41     private final long totalMessageSize;
42     private final int maxRetries;
43     private final Consumer<Throwable> onFailureCallback;
44     private final String logContext;
45
46     private int currentByteOffset = 0;
47     private int currentSliceIndex = FIRST_SLICE_INDEX - 1;
48     private int lastSliceHashCode = INITIAL_SLICE_HASH_CODE;
49     private int currentSliceHashCode = INITIAL_SLICE_HASH_CODE;
50     private int tryCount = 1;
51     private InputStream messageInputStream;
52
53     /**
54      * Constructor.
55      *
56      * @param identifier the identifier for this instance
57      * @param fileBackedStream the FileBackedOutputStream containing the serialized data to slice
58      * @param messageSliceSize the maximum size (in bytes) for a message slice
59      * @param maxRetries the maximum number of retries
60      * @param replyTarget the user-defined target for sliced message replies
61      * @param onFailureCallback the callback to notify on failure
62      * @param logContext the context for log messages
63      * @throws IOException if an error occurs opening the input stream
64      */
65     public SlicedMessageState(final Identifier identifier, final FileBackedOutputStream fileBackedStream,
66             final int messageSliceSize, final int maxRetries, final T replyTarget,
67             final Consumer<Throwable> onFailureCallback, final String logContext) throws IOException {
68         this.identifier = identifier;
69         this.fileBackedStream = fileBackedStream;
70         this.messageSliceSize = messageSliceSize;
71         this.maxRetries = maxRetries;
72         this.replyTarget = replyTarget;
73         this.onFailureCallback = onFailureCallback;
74         this.logContext = logContext;
75
76         messageBytes = fileBackedStream.asByteSource();
77         totalMessageSize = messageBytes.size();
78         messageInputStream = messageBytes.openStream();
79
80         totalSlices = (int)(totalMessageSize / messageSliceSize + (totalMessageSize % messageSliceSize > 0 ? 1 : 0));
81
82         LOG.debug("{}: Message size: {} bytes, total slices to send: {}", logContext, totalMessageSize, totalSlices);
83     }
84
85     /**
86      * Returns the current slice index that has been sent.
87      *
88      * @return the current slice index that has been sent
89      */
90     public int getCurrentSliceIndex() {
91         return currentSliceIndex;
92     }
93
94     /**
95      * Returns the hash code of the last slice that was sent.
96      *
97      * @return the hash code of the last slice that was sent
98      */
99     public int getLastSliceHashCode() {
100         return lastSliceHashCode;
101     }
102
103     /**
104      * Returns the total number of slices to send.
105      *
106      * @return the total number of slices to send
107      */
108     public int getTotalSlices() {
109         return totalSlices;
110     }
111
112     /**
113      * Returns the identifier of this instance.
114      *
115      * @return the identifier
116      */
117     public Identifier getIdentifier() {
118         return identifier;
119     }
120
121     /**
122      * Returns the user-defined target for sliced message replies.
123      *
124      * @return the user-defined target
125      */
126     public T getReplyTarget() {
127         return replyTarget;
128     }
129
130     /**
131      *  Returns the callback to notify on failure.
132      *
133      * @return the callback to notify on failure
134      */
135     public Consumer<Throwable> getOnFailureCallback() {
136         return onFailureCallback;
137     }
138
139     /**
140      * Determines if the slicing can be retried.
141      *
142      * @return true if the slicing can be retried, false if the maximum number of retries has been reached
143      */
144     public boolean canRetry() {
145         return tryCount <= maxRetries;
146     }
147
148     /**
149      * Determines if the given index is the last slice to send.
150      *
151      * @param index the slice index to test
152      * @return true if the index is the last slice, false otherwise
153      */
154     public boolean isLastSlice(final int index) {
155         return totalSlices == index;
156     }
157
158     /**
159      * Reads and returns the next slice of data.
160      *
161      * @return the next slice of data as a byte[]
162      * @throws IOException if an error occurs reading the data
163      */
164     public byte[] getNextSlice() throws IOException {
165         currentSliceIndex++;
166         final int start;
167         if (currentSliceIndex == FIRST_SLICE_INDEX) {
168             start = 0;
169         } else {
170             start = incrementByteOffset();
171         }
172
173         final int size;
174         if (messageSliceSize > totalMessageSize) {
175             size = (int) totalMessageSize;
176         } else if (start + messageSliceSize > totalMessageSize) {
177             size = (int) (totalMessageSize - start);
178         } else {
179             size = messageSliceSize;
180         }
181
182         LOG.debug("{}: getNextSlice: total size: {}, offset: {}, size: {}, index: {}", logContext, totalMessageSize,
183                 start, size, currentSliceIndex);
184
185         byte[] nextSlice = new byte[size];
186         int numRead = messageInputStream.read(nextSlice);
187         if (numRead != size) {
188             throw new IOException(String.format(
189                     "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size));
190         }
191
192         lastSliceHashCode = currentSliceHashCode;
193         currentSliceHashCode = Arrays.hashCode(nextSlice);
194
195         return nextSlice;
196     }
197
198     /**
199      * Resets this instance to restart slicing from the beginning.
200      *
201      * @throws IOException if an error occurs resetting the input stream
202      */
203     public void reset() throws IOException {
204         closeStream();
205
206         tryCount++;
207         currentByteOffset = 0;
208         currentSliceIndex = FIRST_SLICE_INDEX - 1;
209         lastSliceHashCode = INITIAL_SLICE_HASH_CODE;
210         currentSliceHashCode = INITIAL_SLICE_HASH_CODE;
211
212         messageInputStream = messageBytes.openStream();
213     }
214
215     private int incrementByteOffset() {
216         currentByteOffset  += messageSliceSize;
217         return currentByteOffset;
218     }
219
220     private void closeStream() {
221         if (messageInputStream != null) {
222             try {
223                 messageInputStream.close();
224             } catch (IOException e) {
225                 LOG.warn("{}: Error closing message stream", logContext, e);
226             }
227
228             messageInputStream = null;
229         }
230     }
231
232     @Override
233     public void close() {
234         closeStream();
235         fileBackedStream.cleanup();
236     }
237 }