2 * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.messaging;
10 import com.google.common.io.ByteSource;
11 import java.io.IOException;
12 import java.io.InputStream;
13 import java.util.Arrays;
14 import java.util.function.Consumer;
15 import javax.annotation.concurrent.NotThreadSafe;
16 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
17 import org.opendaylight.yangtools.concepts.Identifier;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
22 * Maintains the state of a sliced message.
24 * @author Thomas Pantelis
28 public class SlicedMessageState<T> implements AutoCloseable {
29 private static final Logger LOG = LoggerFactory.getLogger(SlicedMessageState.class);
31 // The index of the first slice that is sent.
32 static final int FIRST_SLICE_INDEX = 1;
34 // The initial hash code for a slice.
35 static final int INITIAL_SLICE_HASH_CODE = -1;
37 private final Identifier identifier;
38 private final int messageSliceSize;
39 private final FileBackedOutputStream fileBackedStream;
40 private final T replyTarget;
41 private final ByteSource messageBytes;
42 private final int totalSlices;
43 private final long totalMessageSize;
44 private final int maxRetries;
45 private final Consumer<Throwable> onFailureCallback;
46 private final String logContext;
48 private int currentByteOffset = 0;
49 private int currentSliceIndex = FIRST_SLICE_INDEX - 1;
50 private int lastSliceHashCode = INITIAL_SLICE_HASH_CODE;
51 private int currentSliceHashCode = INITIAL_SLICE_HASH_CODE;
52 private int tryCount = 1;
53 private InputStream messageInputStream;
58 * @param identifier the identifier for this instance
59 * @param fileBackedStream the FileBackedOutputStream containing the serialized data to slice
60 * @param messageSliceSize the maximum size (in bytes) for a message slice
61 * @param maxRetries the maximum number of retries
62 * @param replyTarget the user-defined target for sliced message replies
63 * @param onFailureCallback the callback to notify on failure
64 * @param logContext the context for log messages
65 * @throws IOException if an error occurs opening the input stream
67 public SlicedMessageState(final Identifier identifier, final FileBackedOutputStream fileBackedStream,
68 final int messageSliceSize, final int maxRetries, final T replyTarget,
69 final Consumer<Throwable> onFailureCallback, final String logContext) throws IOException {
70 this.identifier = identifier;
71 this.fileBackedStream = fileBackedStream;
72 this.messageSliceSize = messageSliceSize;
73 this.maxRetries = maxRetries;
74 this.replyTarget = replyTarget;
75 this.onFailureCallback = onFailureCallback;
76 this.logContext = logContext;
78 messageBytes = fileBackedStream.asByteSource();
79 totalMessageSize = messageBytes.size();
80 messageInputStream = messageBytes.openStream();
82 totalSlices = (int)(totalMessageSize / messageSliceSize + (totalMessageSize % messageSliceSize > 0 ? 1 : 0));
84 LOG.debug("{}: Message size: {} bytes, total slices to send: {}", logContext, totalMessageSize, totalSlices);
88 * Returns the current slice index that has been sent.
90 * @return the current slice index that has been sent
92 public int getCurrentSliceIndex() {
93 return currentSliceIndex;
97 * Returns the hash code of the last slice that was sent.
99 * @return the hash code of the last slice that was sent
101 public int getLastSliceHashCode() {
102 return lastSliceHashCode;
106 * Returns the total number of slices to send.
108 * @return the total number of slices to send
110 public int getTotalSlices() {
115 * Returns the identifier of this instance.
117 * @return the identifier
119 public Identifier getIdentifier() {
124 * Returns the user-defined target for sliced message replies.
126 * @return the user-defined target
128 public T getReplyTarget() {
133 * Returns the callback to notify on failure.
135 * @return the callback to notify on failure
137 public Consumer<Throwable> getOnFailureCallback() {
138 return onFailureCallback;
142 * Determines if the slicing can be retried.
144 * @return true if the slicing can be retried, false if the maximum number of retries has been reached
146 public boolean canRetry() {
147 return tryCount <= maxRetries;
151 * Determines if the given index is the last slice to send.
153 * @param index the slice index to test
154 * @return true if the index is the last slice, false otherwise
156 public boolean isLastSlice(int index) {
157 return totalSlices == index;
161 * Reads and returns the next slice of data.
163 * @return the next slice of data as a byte[]
164 * @throws IOException if an error occurs reading the data
166 public byte[] getNextSlice() throws IOException {
169 if (currentSliceIndex == FIRST_SLICE_INDEX) {
172 start = incrementByteOffset();
176 if (messageSliceSize > totalMessageSize) {
177 size = (int) totalMessageSize;
178 } else if (start + messageSliceSize > totalMessageSize) {
179 size = (int) (totalMessageSize - start);
181 size = messageSliceSize;
184 LOG.debug("{}: getNextSlice: total size: {}, offset: {}, size: {}, index: {}", logContext, totalMessageSize,
185 start, size, currentSliceIndex);
187 byte[] nextSlice = new byte[size];
188 int numRead = messageInputStream.read(nextSlice);
189 if (numRead != size) {
190 throw new IOException(String.format(
191 "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size));
194 lastSliceHashCode = currentSliceHashCode;
195 currentSliceHashCode = Arrays.hashCode(nextSlice);
201 * Resets this instance to restart slicing from the beginning.
203 * @throws IOException if an error occurs resetting the input stream
205 public void reset() throws IOException {
209 currentByteOffset = 0;
210 currentSliceIndex = FIRST_SLICE_INDEX - 1;
211 lastSliceHashCode = INITIAL_SLICE_HASH_CODE;
212 currentSliceHashCode = INITIAL_SLICE_HASH_CODE;
214 messageInputStream = messageBytes.openStream();
217 private int incrementByteOffset() {
218 currentByteOffset += messageSliceSize;
219 return currentByteOffset;
222 private void closeStream() {
223 if (messageInputStream != null) {
225 messageInputStream.close();
226 } catch (IOException e) {
227 LOG.warn("{}: Error closing message stream", logContext, e);
230 messageInputStream = null;
235 public void close() {
237 fileBackedStream.cleanup();