2 * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.messaging;
10 import com.google.common.io.ByteSource;
11 import java.io.IOException;
12 import java.io.InputStream;
13 import java.util.Arrays;
14 import java.util.function.Consumer;
15 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
16 import org.opendaylight.yangtools.concepts.Identifier;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
21 * Maintains the state of a sliced message. This class is NOT thread-safe.
23 * @author Thomas Pantelis
26 public class SlicedMessageState<T> implements AutoCloseable {
27 private static final Logger LOG = LoggerFactory.getLogger(SlicedMessageState.class);
29 // The index of the first slice that is sent.
30 static final int FIRST_SLICE_INDEX = 1;
32 // The initial hash code for a slice.
33 static final int INITIAL_SLICE_HASH_CODE = -1;
35 private final Identifier identifier;
36 private final int messageSliceSize;
37 private final FileBackedOutputStream fileBackedStream;
38 private final T replyTarget;
39 private final ByteSource messageBytes;
40 private final int totalSlices;
41 private final long totalMessageSize;
42 private final int maxRetries;
43 private final Consumer<Throwable> onFailureCallback;
44 private final String logContext;
46 private int currentByteOffset = 0;
47 private int currentSliceIndex = FIRST_SLICE_INDEX - 1;
48 private int lastSliceHashCode = INITIAL_SLICE_HASH_CODE;
49 private int currentSliceHashCode = INITIAL_SLICE_HASH_CODE;
50 private int tryCount = 1;
51 private InputStream messageInputStream;
56 * @param identifier the identifier for this instance
57 * @param fileBackedStream the FileBackedOutputStream containing the serialized data to slice
58 * @param messageSliceSize the maximum size (in bytes) for a message slice
59 * @param maxRetries the maximum number of retries
60 * @param replyTarget the user-defined target for sliced message replies
61 * @param onFailureCallback the callback to notify on failure
62 * @param logContext the context for log messages
63 * @throws IOException if an error occurs opening the input stream
65 public SlicedMessageState(final Identifier identifier, final FileBackedOutputStream fileBackedStream,
66 final int messageSliceSize, final int maxRetries, final T replyTarget,
67 final Consumer<Throwable> onFailureCallback, final String logContext) throws IOException {
68 this.identifier = identifier;
69 this.fileBackedStream = fileBackedStream;
70 this.messageSliceSize = messageSliceSize;
71 this.maxRetries = maxRetries;
72 this.replyTarget = replyTarget;
73 this.onFailureCallback = onFailureCallback;
74 this.logContext = logContext;
76 messageBytes = fileBackedStream.asByteSource();
77 totalMessageSize = messageBytes.size();
78 messageInputStream = messageBytes.openStream();
80 totalSlices = (int)(totalMessageSize / messageSliceSize + (totalMessageSize % messageSliceSize > 0 ? 1 : 0));
82 LOG.debug("{}: Message size: {} bytes, total slices to send: {}", logContext, totalMessageSize, totalSlices);
86 * Returns the current slice index that has been sent.
88 * @return the current slice index that has been sent
90 public int getCurrentSliceIndex() {
91 return currentSliceIndex;
95 * Returns the hash code of the last slice that was sent.
97 * @return the hash code of the last slice that was sent
99 public int getLastSliceHashCode() {
100 return lastSliceHashCode;
104 * Returns the total number of slices to send.
106 * @return the total number of slices to send
108 public int getTotalSlices() {
113 * Returns the identifier of this instance.
115 * @return the identifier
117 public Identifier getIdentifier() {
122 * Returns the user-defined target for sliced message replies.
124 * @return the user-defined target
126 public T getReplyTarget() {
131 * Returns the callback to notify on failure.
133 * @return the callback to notify on failure
135 public Consumer<Throwable> getOnFailureCallback() {
136 return onFailureCallback;
140 * Determines if the slicing can be retried.
142 * @return true if the slicing can be retried, false if the maximum number of retries has been reached
144 public boolean canRetry() {
145 return tryCount <= maxRetries;
149 * Determines if the given index is the last slice to send.
151 * @param index the slice index to test
152 * @return true if the index is the last slice, false otherwise
154 public boolean isLastSlice(final int index) {
155 return totalSlices == index;
159 * Reads and returns the next slice of data.
161 * @return the next slice of data as a byte[]
162 * @throws IOException if an error occurs reading the data
164 public byte[] getNextSlice() throws IOException {
167 if (currentSliceIndex == FIRST_SLICE_INDEX) {
170 start = incrementByteOffset();
174 if (messageSliceSize > totalMessageSize) {
175 size = (int) totalMessageSize;
176 } else if (start + messageSliceSize > totalMessageSize) {
177 size = (int) (totalMessageSize - start);
179 size = messageSliceSize;
182 LOG.debug("{}: getNextSlice: total size: {}, offset: {}, size: {}, index: {}", logContext, totalMessageSize,
183 start, size, currentSliceIndex);
185 byte[] nextSlice = new byte[size];
186 int numRead = messageInputStream.read(nextSlice);
187 if (numRead != size) {
188 throw new IOException(String.format(
189 "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size));
192 lastSliceHashCode = currentSliceHashCode;
193 currentSliceHashCode = Arrays.hashCode(nextSlice);
199 * Resets this instance to restart slicing from the beginning.
201 * @throws IOException if an error occurs resetting the input stream
203 public void reset() throws IOException {
207 currentByteOffset = 0;
208 currentSliceIndex = FIRST_SLICE_INDEX - 1;
209 lastSliceHashCode = INITIAL_SLICE_HASH_CODE;
210 currentSliceHashCode = INITIAL_SLICE_HASH_CODE;
212 messageInputStream = messageBytes.openStream();
215 private int incrementByteOffset() {
216 currentByteOffset += messageSliceSize;
217 return currentByteOffset;
220 private void closeStream() {
221 if (messageInputStream != null) {
223 messageInputStream.close();
224 } catch (IOException e) {
225 LOG.warn("{}: Error closing message stream", logContext, e);
228 messageInputStream = null;
233 public void close() {
235 fileBackedStream.cleanup();