Bug 7449: Add message slicing/re-assembly classes
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / SlicedMessageState.java
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/SlicedMessageState.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/SlicedMessageState.java
new file mode 100644 (file)
index 0000000..8c3cb51
--- /dev/null
@@ -0,0 +1,239 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains the state of a sliced message.
+ *
+ * @author Thomas Pantelis
+ * @see MessageSlicer
+ */
+@NotThreadSafe
+public class SlicedMessageState<T> implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(SlicedMessageState.class);
+
+    // The index of the first slice that is sent.
+    static final int FIRST_SLICE_INDEX = 1;
+
+    // The initial hash code for a slice.
+    static final int INITIAL_SLICE_HASH_CODE = -1;
+
+    private final Identifier identifier;
+    private final int messageSliceSize;
+    private final FileBackedOutputStream fileBackedStream;
+    private final T replyTarget;
+    private final ByteSource messageBytes;
+    private final int totalSlices;
+    private final long totalMessageSize;
+    private final int maxRetries;
+    private final Consumer<Throwable> onFailureCallback;
+    private final String logContext;
+
+    private int currentByteOffset = 0;
+    private int currentSliceIndex = FIRST_SLICE_INDEX - 1;
+    private int lastSliceHashCode = INITIAL_SLICE_HASH_CODE;
+    private int currentSliceHashCode = INITIAL_SLICE_HASH_CODE;
+    private int tryCount = 1;
+    private InputStream messageInputStream;
+
+    /**
+     * Constructor.
+     *
+     * @param identifier the identifier for this instance
+     * @param fileBackedStream the FileBackedOutputStream containing the serialized data to slice
+     * @param messageSliceSize the maximum size (in bytes) for a message slice
+     * @param maxRetries the maximum number of retries
+     * @param replyTarget the user-defined target for sliced message replies
+     * @param onFailureCallback the callback to notify on failure
+     * @param logContext the context for log messages
+     * @throws IOException if an error occurs opening the input stream
+     */
+    public SlicedMessageState(final Identifier identifier, final FileBackedOutputStream fileBackedStream,
+            final int messageSliceSize, final int maxRetries, final T replyTarget,
+            final Consumer<Throwable> onFailureCallback, final String logContext) throws IOException {
+        this.identifier = identifier;
+        this.fileBackedStream = fileBackedStream;
+        this.messageSliceSize = messageSliceSize;
+        this.maxRetries = maxRetries;
+        this.replyTarget = replyTarget;
+        this.onFailureCallback = onFailureCallback;
+        this.logContext = logContext;
+
+        messageBytes = fileBackedStream.asByteSource();
+        totalMessageSize = messageBytes.size();
+        messageInputStream = messageBytes.openStream();
+
+        totalSlices = (int)(totalMessageSize / messageSliceSize + (totalMessageSize % messageSliceSize > 0 ? 1 : 0));
+
+        LOG.debug("{}: Message size: {} bytes, total slices to send: {}", logContext, totalMessageSize, totalSlices);
+    }
+
+    /**
+     * Returns the current slice index that has been sent.
+     *
+     * @return the current slice index that has been sent
+     */
+    public int getCurrentSliceIndex() {
+        return currentSliceIndex;
+    }
+
+    /**
+     * Returns the hash code of the last slice that was sent.
+     *
+     * @return the hash code of the last slice that was sent
+     */
+    public int getLastSliceHashCode() {
+        return lastSliceHashCode;
+    }
+
+    /**
+     * Returns the total number of slices to send.
+     *
+     * @return the total number of slices to send
+     */
+    public int getTotalSlices() {
+        return totalSlices;
+    }
+
+    /**
+     * Returns the identifier of this instance.
+     *
+     * @return the identifier
+     */
+    public Identifier getIdentifier() {
+        return identifier;
+    }
+
+    /**
+     * Returns the user-defined target for sliced message replies.
+     *
+     * @return the user-defined target
+     */
+    public T getReplyTarget() {
+        return replyTarget;
+    }
+
+    /**
+     *  Returns the callback to notify on failure.
+     *
+     * @return the callback to notify on failure
+     */
+    public Consumer<Throwable> getOnFailureCallback() {
+        return onFailureCallback;
+    }
+
+    /**
+     * Determines if the slicing can be retried.
+     *
+     * @return true if the slicing can be retried, false if the maximum number of retries has been reached
+     */
+    public boolean canRetry() {
+        return tryCount <= maxRetries;
+    }
+
+    /**
+     * Determines if the given index is the last slice to send.
+     *
+     * @param index the slice index to test
+     * @return true if the index is the last slice, false otherwise
+     */
+    public boolean isLastSlice(int index) {
+        return totalSlices == index;
+    }
+
+    /**
+     * Reads and returns the next slice of data.
+     *
+     * @return the next slice of data as a byte[]
+     * @throws IOException if an error occurs reading the data
+     */
+    public byte[] getNextSlice() throws IOException {
+        currentSliceIndex++;
+        final int start;
+        if (currentSliceIndex == FIRST_SLICE_INDEX) {
+            start = 0;
+        } else {
+            start = incrementByteOffset();
+        }
+
+        final int size;
+        if (messageSliceSize > totalMessageSize) {
+            size = (int) totalMessageSize;
+        } else if (start + messageSliceSize > totalMessageSize) {
+            size = (int) (totalMessageSize - start);
+        } else {
+            size = messageSliceSize;
+        }
+
+        LOG.debug("{}: getNextSlice: total size: {}, offset: {}, size: {}, index: {}", logContext, totalMessageSize,
+                start, size, currentSliceIndex);
+
+        byte[] nextSlice = new byte[size];
+        int numRead = messageInputStream.read(nextSlice);
+        if (numRead != size) {
+            throw new IOException(String.format(
+                    "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size));
+        }
+
+        lastSliceHashCode = currentSliceHashCode;
+        currentSliceHashCode = Arrays.hashCode(nextSlice);
+
+        return nextSlice;
+    }
+
+    /**
+     * Resets this instance to restart slicing from the beginning.
+     *
+     * @throws IOException if an error occurs resetting the input stream
+     */
+    public void reset() throws IOException {
+        closeStream();
+
+        tryCount++;
+        currentByteOffset = 0;
+        currentSliceIndex = FIRST_SLICE_INDEX - 1;
+        lastSliceHashCode = INITIAL_SLICE_HASH_CODE;
+        currentSliceHashCode = INITIAL_SLICE_HASH_CODE;
+
+        messageInputStream = messageBytes.openStream();
+    }
+
+    private int incrementByteOffset() {
+        currentByteOffset  += messageSliceSize;
+        return currentByteOffset;
+    }
+
+    private void closeStream() {
+        if (messageInputStream != null) {
+            try {
+                messageInputStream.close();
+            } catch (IOException e) {
+                LOG.warn("{}: Error closing message stream", logContext, e);
+            }
+
+            messageInputStream = null;
+        }
+    }
+
+    @Override
+    public void close() {
+        closeStream();
+        fileBackedStream.cleanup();
+    }
+}