Bug 7449: Add message slicing/re-assembly classes 67/55767/26
authorTom Pantelis <tompantelis@gmail.com>
Thu, 20 Apr 2017 13:10:05 +0000 (09:10 -0400)
committerRobert Varga <nite@hq.sk>
Tue, 20 Jun 2017 09:27:22 +0000 (09:27 +0000)
Added re-usable classes for slicing messages into smaller chunks and
re-assembling them. The process is similar to the current raft
install snapshot chunking.

The 2 main classes are MessageSlicer and MessageAssembler. The basic workflow
is:

 - MessageSlicer#slice method is called which serializes the message,
   creates and caches a SlicedMessageState instance and determines the number
   of slices based on the serialized size and the maximum message slice size,
   then sends the first MessageSlice message.
 - The MessageAssembler on the other end receives the MessageSlice, creates an
   AssembledMessageState instance if necessary and appends the byte[] chunk to
   the assembled stream. A MessageSliceReply is returned.
 - The MessageSlicer receives the MessageSliceReply and sends the next
   MessageSlice chunk, if necessary.
 - Once the last MessageSlice chunk is received by the MessageAssembler, it
   re-assembles the original message by de-serializing the assembled stream
   and notifies the user-supplied callback (of type Consumer<Object>) to handle
   the message.

Both MessageSlicer and MessageAssembler use a guava Cache and can be configured
to evict state that has been inactive for a period of time, ie if a message hasn't
been received by the other end.

The MessageSliceReply can propagate a MessageSliceException. If the
MessageSliceException indicates it's re-triable, the MessageSlicer will restart
slicing from the beginning. Otherwise slicing is aborted and the user-supplied
failure callback is notified.

Change-Id: Iceea212b12f49c3944bade50afded92244e4b31a
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
23 files changed:
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AbortSlicing.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssembledMessageState.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssemblerClosedException.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssemblerSealedException.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageAssembler.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlice.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceException.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifier.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/SliceOptions.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/SlicedMessageState.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbortSlicingTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbstractMessagingTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/BytesMessage.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageAssemblerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifierTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceReplyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicingIntegrationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/StringIdentifier.java [new file with mode: 0644]

diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamFactory.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamFactory.java
new file mode 100644 (file)
index 0000000..0cd4be6
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import javax.annotation.Nullable;
+
+/**
+ * A factory for creating {@link FileBackedOutputStream} instances.
+ *
+ * @author Thomas Pantelis
+ * @see FileBackedOutputStream
+ */
+public class FileBackedOutputStreamFactory {
+    private final int fileThreshold;
+    private final String fileDirectory;
+
+    /**
+     * Constructor.
+     *
+     * @param fileThreshold the number of bytes before streams should switch to buffering to a file
+     * @param fileDirectory the directory in which to create files if needed. If null, the default temp file
+     *                      location is used.
+     */
+    public FileBackedOutputStreamFactory(final int fileThreshold, final @Nullable String fileDirectory) {
+        this.fileThreshold = fileThreshold;
+        this.fileDirectory = fileDirectory;
+    }
+
+    /**
+     * Creates a new {@link FileBackedOutputStream} with the settings configured for this factory.
+     *
+     * @return a {@link FileBackedOutputStream} instance
+     */
+    public FileBackedOutputStream newInstance() {
+        return new FileBackedOutputStream(fileThreshold, fileDirectory);
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AbortSlicing.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AbortSlicing.java
new file mode 100644 (file)
index 0000000..b136ed7
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * Message sent to abort slicing.
+ *
+ * @author Thomas Pantelis
+ */
+class AbortSlicing implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final Identifier identifier;
+
+    AbortSlicing(final Identifier identifier) {
+        this.identifier = Preconditions.checkNotNull(identifier);
+    }
+
+    Identifier getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public String toString() {
+        return "AbortSlicing [identifier=" + identifier + "]";
+    }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private AbortSlicing abortSlicing;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+        }
+
+        Proxy(AbortSlicing abortSlicing) {
+            this.abortSlicing = abortSlicing;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(abortSlicing.identifier);
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            abortSlicing = new AbortSlicing((Identifier) in.readObject());
+        }
+
+        private Object readResolve() {
+            return abortSlicing;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssembledMessageState.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssembledMessageState.java
new file mode 100644 (file)
index 0000000..16c73c7
--- /dev/null
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains the state of an assembled message.
+ *
+ * @author Thomas Pantelis
+ */
+@NotThreadSafe
+public class AssembledMessageState implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(AssembledMessageState.class);
+
+    private final int totalSlices;
+    private final BufferedOutputStream bufferedStream;
+    private final FileBackedOutputStream fileBackedStream;
+    private final Identifier identifier;
+    private final String logContext;
+
+    private int lastSliceIndexReceived = SlicedMessageState.FIRST_SLICE_INDEX - 1;
+    private int lastSliceHashCodeReceived = SlicedMessageState.INITIAL_SLICE_HASH_CODE;
+    private boolean sealed = false;
+    private boolean closed = false;
+    private long assembledSize;
+
+    /**
+     * Constructor.
+     *
+     * @param identifier the identifier for this instance
+     * @param totalSlices the total number of slices to expect
+     * @param fileBackedStreamFactory factory for creating the FileBackedOutputStream instance used for streaming
+     * @param logContext the context for log messages
+     */
+    public AssembledMessageState(final Identifier identifier, final int totalSlices,
+            final FileBackedOutputStreamFactory fileBackedStreamFactory, final String logContext) {
+        this.identifier = identifier;
+        this.totalSlices = totalSlices;
+        this.logContext = logContext;
+
+        fileBackedStream = fileBackedStreamFactory.newInstance();
+        bufferedStream = new BufferedOutputStream(fileBackedStream);
+    }
+
+    /**
+     * Returns the identifier of this instance.
+     *
+     * @return the identifier
+     */
+    public Identifier getIdentifier() {
+        return identifier;
+    }
+
+    /**
+     * Adds a slice to the assembled stream.
+     *
+     * @param sliceIndex the index of the slice
+     * @param data the sliced data
+     * @param lastSliceHashCode the hash code of the last slice sent
+     * @return true if this is the last slice received, false otherwise
+     * @throws MessageSliceException
+     *         <ul>
+     *         <li>if the slice index is invalid</li>
+     *         <li>if the last slice hash code is invalid</li>
+     *         <li>if an error occurs writing the data to the stream</li>
+     *         </ul>
+     *         In addition, this instance is automatically closed and can no longer be used.
+     * @throws AssemblerSealedException if this instance is already sealed (ie has received all the slices)
+     * @throws AssemblerClosedException if this instance is already closed
+     */
+    public boolean addSlice(final int sliceIndex, final byte[] data, final int lastSliceHashCode)
+            throws MessageSliceException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{}: addSlice: identifier: {}, sliceIndex: {}, lastSliceIndex: {}, assembledSize: {}, "
+                    + "sliceHashCode: {}, lastSliceHashCode: {}", logContext, identifier, sliceIndex,
+                    lastSliceIndexReceived, assembledSize, lastSliceHashCode, lastSliceHashCodeReceived);
+        }
+
+        try {
+            validateSlice(sliceIndex, lastSliceHashCode);
+
+            assembledSize += data.length;
+            lastSliceIndexReceived = sliceIndex;
+            lastSliceHashCodeReceived = Arrays.hashCode(data);
+
+            bufferedStream.write(data);
+
+            sealed = sliceIndex == totalSlices;
+            if (sealed) {
+                bufferedStream.close();
+            }
+        } catch (IOException e) {
+            close();
+            throw new MessageSliceException(String.format("Error writing data for slice %d of message %s",
+                    sliceIndex, identifier), e);
+        }
+
+        return sealed;
+    }
+
+    /**
+     * Returns the assembled bytes as a ByteSource. This method must only be called after this instance is sealed.
+     *
+     * @return a ByteSource containing the assembled bytes
+     * @throws IOException if an error occurs obtaining the assembled bytes
+     * @throws IllegalStateException is this instance is not sealed
+     */
+    public ByteSource getAssembledBytes() throws IOException {
+        Preconditions.checkState(sealed, "Last slice not received yet");
+        return fileBackedStream.asByteSource();
+    }
+
+    private void validateSlice(final int sliceIndex, final int lastSliceHashCode) throws MessageSliceException {
+        if (closed) {
+            throw new AssemblerClosedException(identifier);
+        }
+
+        if (sealed) {
+            throw new AssemblerSealedException(String.format(
+                    "Received slice index for message %s but all %d expected slices have already already received.",
+                    identifier, totalSlices));
+        }
+
+        if (lastSliceIndexReceived + 1 != sliceIndex) {
+            close();
+            throw new MessageSliceException(String.format("Expected sliceIndex %d but got %d for message %s",
+                    lastSliceIndexReceived + 1, sliceIndex, identifier), true);
+        }
+
+        if (lastSliceHashCode != lastSliceHashCodeReceived) {
+            close();
+            throw new MessageSliceException(String.format("The hash code of the recorded last slice (%d) does not "
+                    + "match the senders last hash code (%d) for message %s", lastSliceHashCodeReceived,
+                    lastSliceHashCode, identifier), true);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (closed) {
+            return;
+        }
+
+        closed = true;
+        if (!sealed) {
+            try {
+                bufferedStream.close();
+            } catch (IOException e) {
+                LOG.debug("{}: Error closing output stream", logContext, e);
+            }
+        }
+
+        fileBackedStream.cleanup();
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssemblerClosedException.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssemblerClosedException.java
new file mode 100644 (file)
index 0000000..83c8dcb
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * A MessageSliceException indicating the message assembler has already been closed.
+ *
+ * @author Thomas Pantelis
+ */
+public class AssemblerClosedException extends MessageSliceException {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Constructs an instance.
+     *
+     * @param identifier the identifier whose state was closed
+     */
+    public AssemblerClosedException(final Identifier identifier) {
+        super(String.format("Message assembler for %s has already been closed", identifier), false);
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssemblerSealedException.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssemblerSealedException.java
new file mode 100644 (file)
index 0000000..df9ac63
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+/**
+ * A MessageSliceException indicating the message assembler has already been sealed.
+ *
+ * @author Thomas Pantelis
+ */
+public class AssemblerSealedException extends MessageSliceException {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Constructs an instance.
+     *
+     * @param message he detail message
+     */
+    public AssemblerSealedException(String message) {
+        super(message, false);
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageAssembler.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageAssembler.java
new file mode 100644 (file)
index 0000000..71b07f4
--- /dev/null
@@ -0,0 +1,286 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class re-assembles messages sliced into smaller chunks by {@link MessageSlicer}.
+ *
+ * @author Thomas Pantelis
+ * @see MessageSlicer
+ */
+public class MessageAssembler implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(MessageAssembler.class);
+
+    private final Cache<Identifier, AssembledMessageState> stateCache;
+    private final FileBackedOutputStreamFactory filedBackedStreamFactory;
+    private final BiConsumer<Object, ActorRef> assembledMessageCallback;
+    private final String logContext;
+
+    private MessageAssembler(Builder builder) {
+        this.filedBackedStreamFactory = Preconditions.checkNotNull(builder.filedBackedStreamFactory,
+                "FiledBackedStreamFactory cannot be null");
+        this.assembledMessageCallback = Preconditions.checkNotNull(builder.assembledMessageCallback,
+                "assembledMessageCallback cannot be null");
+        this.logContext = builder.logContext;
+
+        stateCache = CacheBuilder.newBuilder()
+                .expireAfterAccess(builder.expireStateAfterInactivityDuration, builder.expireStateAfterInactivityUnit)
+                .removalListener((RemovalListener<Identifier, AssembledMessageState>) notification ->
+                    stateRemoved(notification)).build();
+    }
+
+    /**
+     * Returns a new Builder for creating MessageAssembler instances.
+     *
+     * @return a Builder instance
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Checks if the given message is handled by this class. If so, it should be forwarded to the
+     * {@link #handleMessage(Object, ActorRef)} method
+     *
+     * @param message the message to check
+     * @return true if handled, false otherwise
+     */
+    public static boolean isHandledMessage(Object message) {
+        return message instanceof MessageSlice || message instanceof AbortSlicing;
+    }
+
+    @Override
+    public void close() {
+        LOG.debug("{}: Closing", logContext);
+        stateCache.invalidateAll();
+    }
+
+    /**
+     * Checks for and removes assembled message state that has expired due to inactivity from the slicing component
+     * on the other end.
+     */
+    public void checkExpiredAssembledMessageState() {
+        if (stateCache.size() > 0) {
+            stateCache.cleanUp();
+        }
+    }
+
+    /**
+     * Invoked to handle message slices and other messages pertaining to this class.
+     *
+     * @param message the message
+     * @param sendTo the reference of the actor to which subsequent message slices should be sent
+     * @return true if the message was handled, false otherwise
+     */
+    public boolean handleMessage(final Object message, final @Nonnull ActorRef sendTo) {
+        if (message instanceof MessageSlice) {
+            LOG.debug("{}: handleMessage: {}", logContext, message);
+            onMessageSlice((MessageSlice) message, sendTo);
+            return true;
+        } else if (message instanceof AbortSlicing) {
+            LOG.debug("{}: handleMessage: {}", logContext, message);
+            onAbortSlicing((AbortSlicing) message);
+            return true;
+        }
+
+        return false;
+    }
+
+    private void onMessageSlice(final MessageSlice messageSlice, final ActorRef sendTo) {
+        final Identifier identifier = messageSlice.getIdentifier();
+        try {
+            final AssembledMessageState state = stateCache.get(identifier, () -> createState(messageSlice));
+            processMessageSliceForState(messageSlice, state, sendTo);
+        } catch (ExecutionException e) {
+            final MessageSliceException messageSliceEx;
+            final Throwable cause = e.getCause();
+            if (cause instanceof MessageSliceException) {
+                messageSliceEx = (MessageSliceException) cause;
+            } else {
+                messageSliceEx = new MessageSliceException(String.format(
+                        "Error creating state for identifier %s", identifier), cause);
+            }
+
+            messageSlice.getReplyTo().tell(MessageSliceReply.failed(identifier, messageSliceEx, sendTo),
+                    ActorRef.noSender());
+        }
+    }
+
+    private AssembledMessageState createState(final MessageSlice messageSlice) throws MessageSliceException {
+        final Identifier identifier = messageSlice.getIdentifier();
+        if (messageSlice.getSliceIndex() == SlicedMessageState.FIRST_SLICE_INDEX) {
+            LOG.debug("{}: Received first slice for {} - creating AssembledMessageState", logContext, identifier);
+            return new AssembledMessageState(identifier, messageSlice.getTotalSlices(),
+                    filedBackedStreamFactory, logContext);
+        }
+
+        LOG.debug("{}: AssembledMessageState not found for {} - returning failed reply", logContext, identifier);
+        throw new MessageSliceException(String.format(
+                "No assembled state found for identifier %s and slice index %s", identifier,
+                messageSlice.getSliceIndex()), true);
+    }
+
+    private void processMessageSliceForState(final MessageSlice messageSlice, AssembledMessageState state,
+            final ActorRef sendTo) {
+        final Identifier identifier = messageSlice.getIdentifier();
+        final ActorRef replyTo = messageSlice.getReplyTo();
+        Object reAssembledMessage = null;
+        synchronized (state) {
+            final int sliceIndex = messageSlice.getSliceIndex();
+            try {
+                final MessageSliceReply successReply = MessageSliceReply.success(identifier, sliceIndex, sendTo);
+                if (state.addSlice(sliceIndex, messageSlice.getData(), messageSlice.getLastSliceHashCode())) {
+                    LOG.debug("{}: Received last slice for {}", logContext, identifier);
+
+                    reAssembledMessage = reAssembleMessage(state);
+
+                    replyTo.tell(successReply, ActorRef.noSender());
+                    removeState(identifier);
+                } else {
+                    LOG.debug("{}: Added slice for {} - expecting more", logContext, identifier);
+                    replyTo.tell(successReply, ActorRef.noSender());
+                }
+            } catch (MessageSliceException e) {
+                LOG.warn("{}: Error processing {}", logContext, messageSlice, e);
+                replyTo.tell(MessageSliceReply.failed(identifier, e, sendTo), ActorRef.noSender());
+                removeState(identifier);
+            }
+        }
+
+        if (reAssembledMessage != null) {
+            LOG.debug("{}: Notifying callback of re-assembled message {}", logContext, reAssembledMessage);
+            assembledMessageCallback.accept(reAssembledMessage, replyTo);
+        }
+    }
+
+    private Object reAssembleMessage(final AssembledMessageState state) throws MessageSliceException {
+        try {
+            final ByteSource assembledBytes = state.getAssembledBytes();
+            try (ObjectInputStream in = new ObjectInputStream(assembledBytes.openStream())) {
+                return in.readObject();
+            }
+
+        } catch (IOException | ClassNotFoundException  e) {
+            throw new MessageSliceException(String.format("Error re-assembling bytes for identifier %s",
+                    state.getIdentifier()), e);
+        }
+    }
+
+    private void onAbortSlicing(AbortSlicing message) {
+        removeState(message.getIdentifier());
+    }
+
+    private void removeState(final Identifier identifier) {
+        LOG.debug("{}: Removing state for {}", logContext, identifier);
+        stateCache.invalidate(identifier);
+    }
+
+    private void stateRemoved(RemovalNotification<Identifier, AssembledMessageState> notification) {
+        if (notification.wasEvicted()) {
+            LOG.warn("{}: AssembledMessageState for {} was expired from the cache", logContext, notification.getKey());
+        } else {
+            LOG.debug("{}: AssembledMessageState for {} was removed from the cache due to {}", logContext,
+                    notification.getKey(), notification.getCause());
+        }
+
+        notification.getValue().close();
+    }
+
+    @VisibleForTesting
+    boolean hasState(Identifier forIdentifier) {
+        boolean exists = stateCache.getIfPresent(forIdentifier) != null;
+        stateCache.cleanUp();
+        return exists;
+    }
+
+    public static class Builder {
+        private FileBackedOutputStreamFactory filedBackedStreamFactory;
+        private BiConsumer<Object, ActorRef> assembledMessageCallback;
+        private long expireStateAfterInactivityDuration = 1;
+        private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
+        private String logContext = "<no-context>";
+
+        /**
+         * Sets the factory for creating FileBackedOutputStream instances used for streaming messages.
+         *
+         * @param newFiledBackedStreamFactory the factory for creating FileBackedOutputStream instances
+         * @return this Builder
+         */
+        public Builder filedBackedStreamFactory(final FileBackedOutputStreamFactory newFiledBackedStreamFactory) {
+            this.filedBackedStreamFactory = Preconditions.checkNotNull(newFiledBackedStreamFactory);
+            return this;
+        }
+
+        /**
+         * Sets the Consumer callback for assembled messages. The callback takes the assembled message and the
+         * original sender ActorRef as arguments.
+         *
+         * @param newAssembledMessageCallback the Consumer callback
+         * @return this Builder
+         */
+        public Builder assembledMessageCallback(final BiConsumer<Object, ActorRef> newAssembledMessageCallback) {
+            this.assembledMessageCallback = newAssembledMessageCallback;
+            return this;
+        }
+
+        /**
+         * Sets the duration and time unit whereby assembled message state is purged from the cache due to
+         * inactivity from the slicing component on the other end. By default, state is purged after 1 minute of
+         * inactivity.
+         *
+         * @param duration the length of time after which a state entry is purged
+         * @param unit the unit the duration is expressed in
+         * @return this Builder
+         */
+        public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
+            Preconditions.checkArgument(duration > 0, "duration must be > 0");
+            this.expireStateAfterInactivityDuration = duration;
+            this.expireStateAfterInactivityUnit = unit;
+            return this;
+        }
+
+        /**
+         * Sets the context for log messages.
+         *
+         * @param newLogContext the log context
+         * @return this Builder
+         */
+        public Builder logContext(final String newLogContext) {
+            this.logContext = newLogContext;
+            return this;
+        }
+
+        /**
+         * Builds a new MessageAssembler instance.
+         *
+         * @return a new MessageAssembler
+         */
+        public MessageAssembler build() {
+            return new MessageAssembler(this);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlice.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlice.java
new file mode 100644 (file)
index 0000000..00ab31d
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import akka.actor.ActorRef;
+import akka.serialization.JavaSerializer;
+import akka.serialization.Serialization;
+import com.google.common.base.Preconditions;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * Represents a sliced message chunk.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSlice implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final Identifier identifier;
+    private final byte[] data;
+    private final int sliceIndex;
+    private final int totalSlices;
+    private final int lastSliceHashCode;
+    private final ActorRef replyTo;
+
+    public MessageSlice(Identifier identifier, byte[] data, int sliceIndex, int totalSlices, int lastSliceHashCode,
+            final ActorRef replyTo) {
+        this.identifier = Preconditions.checkNotNull(identifier);
+        this.data = Preconditions.checkNotNull(data);
+        this.sliceIndex = sliceIndex;
+        this.totalSlices = totalSlices;
+        this.lastSliceHashCode = lastSliceHashCode;
+        this.replyTo = Preconditions.checkNotNull(replyTo);
+    }
+
+    public Identifier getIdentifier() {
+        return identifier;
+    }
+
+    @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "Exposes a mutable object stored in a field but "
+            + "this is OK since this class is merely a DTO and does not process the byte[] internally."
+            + "Also it would be inefficient to create a return copy as the byte[] could be large.")
+    public byte[] getData() {
+        return data;
+    }
+
+    public int getSliceIndex() {
+        return sliceIndex;
+    }
+
+    public int getTotalSlices() {
+        return totalSlices;
+    }
+
+    public int getLastSliceHashCode() {
+        return lastSliceHashCode;
+    }
+
+    public ActorRef getReplyTo() {
+        return replyTo;
+    }
+
+    @Override
+    public String toString() {
+        return "MessageSlice [identifier=" + identifier + ", data.length=" + data.length + ", sliceIndex="
+                + sliceIndex + ", totalSlices=" + totalSlices + ", lastSliceHashCode=" + lastSliceHashCode
+                + ", replyTo=" + replyTo + "]";
+    }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private MessageSlice messageSlice;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+        }
+
+        Proxy(MessageSlice messageSlice) {
+            this.messageSlice = messageSlice;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(messageSlice.identifier);
+            out.writeInt(messageSlice.sliceIndex);
+            out.writeInt(messageSlice.totalSlices);
+            out.writeInt(messageSlice.lastSliceHashCode);
+            out.writeObject(messageSlice.data);
+            out.writeObject(Serialization.serializedActorPath(messageSlice.replyTo));
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            Identifier identifier = (Identifier) in.readObject();
+            int sliceIndex = in.readInt();
+            int totalSlices = in.readInt();
+            int lastSliceHashCode = in.readInt();
+            byte[] data = (byte[])in.readObject();
+            ActorRef replyTo = JavaSerializer.currentSystem().value().provider()
+                    .resolveActorRef((String) in.readObject());
+
+            messageSlice = new MessageSlice(identifier, data, sliceIndex, totalSlices, lastSliceHashCode, replyTo);
+        }
+
+        private Object readResolve() {
+            return messageSlice;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceException.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceException.java
new file mode 100644 (file)
index 0000000..09ee723
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+/**
+ * An exception indicating a message slice failure.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSliceException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    private final boolean isRetriable;
+
+    /**
+     * Constructs an instance.
+     *
+     * @param message the detail message
+     * @param cause the cause
+     */
+    public MessageSliceException(final String message, final Throwable cause) {
+        super(message, cause);
+        isRetriable = false;
+    }
+
+    /**
+     * Constructs an instance.
+     *
+     * @param message the detail message
+     * @param isRetriable if true, indicates the original operation can be retried
+     */
+    public MessageSliceException(final String message, final boolean isRetriable) {
+        super(message);
+        this.isRetriable = isRetriable;
+    }
+
+    /**
+     * Returns whether or not the original operation can be retried.
+     *
+     * @return true if it can be retried, false otherwise
+     */
+    public boolean isRetriable() {
+        return isRetriable;
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifier.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifier.java
new file mode 100644 (file)
index 0000000..2d1147b
--- /dev/null
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * Identifier for a message slice that is composed of a client-supplied Identifier and an internal counter value.
+ *
+ * @author Thomas Pantelis
+ */
+final class MessageSliceIdentifier implements Identifier {
+    private static final long serialVersionUID = 1L;
+    private static final AtomicLong ID_COUNTER = new AtomicLong(1);
+
+    private final Identifier clientIdentifier;
+    private final long messageId;
+
+    MessageSliceIdentifier(final Identifier clientIdentifier) {
+        this(clientIdentifier, ID_COUNTER.getAndIncrement());
+    }
+
+    private MessageSliceIdentifier(final Identifier clientIdentifier, final long messageId) {
+        this.clientIdentifier = Preconditions.checkNotNull(clientIdentifier);
+        this.messageId = messageId;
+    }
+
+    Identifier getClientIdentifier() {
+        return clientIdentifier;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + clientIdentifier.hashCode();
+        result = prime * result + (int) (messageId ^ messageId >>> 32);
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (!(obj instanceof MessageSliceIdentifier)) {
+            return false;
+        }
+
+        MessageSliceIdentifier other = (MessageSliceIdentifier) obj;
+        return other.clientIdentifier.equals(clientIdentifier) && other.messageId == messageId;
+    }
+
+    @Override
+    public String toString() {
+        return "MessageSliceIdentifier [clientIdentifier=" + clientIdentifier + ", messageId=" + messageId + "]";
+    }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private MessageSliceIdentifier messageSliceId;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+        }
+
+        Proxy(MessageSliceIdentifier messageSliceId) {
+            this.messageSliceId = messageSliceId;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(messageSliceId.clientIdentifier);
+            out.writeLong(messageSliceId.messageId);
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            messageSliceId = new MessageSliceIdentifier((Identifier) in.readObject(), in.readLong());
+        }
+
+        private Object readResolve() {
+            return messageSliceId;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceReply.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceReply.java
new file mode 100644 (file)
index 0000000..dc80c33
--- /dev/null
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import akka.actor.ActorRef;
+import akka.serialization.JavaSerializer;
+import akka.serialization.Serialization;
+import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.util.Optional;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * The reply message for {@link MessageSlice}.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSliceReply implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final Identifier identifier;
+    private final int sliceIndex;
+    private final MessageSliceException failure;
+    private final ActorRef sendTo;
+
+    private MessageSliceReply(final Identifier identifier, final int sliceIndex, final MessageSliceException failure,
+            final ActorRef sendTo) {
+        this.identifier = Preconditions.checkNotNull(identifier);
+        this.sliceIndex = sliceIndex;
+        this.sendTo = Preconditions.checkNotNull(sendTo);
+        this.failure = failure;
+    }
+
+    public static MessageSliceReply success(final Identifier identifier, final int sliceIndex, final ActorRef sendTo) {
+        return new MessageSliceReply(identifier, sliceIndex, null, sendTo);
+    }
+
+    public static MessageSliceReply failed(final Identifier identifier, final MessageSliceException failure,
+            final ActorRef sendTo) {
+        return new MessageSliceReply(identifier, -1, failure, sendTo);
+    }
+
+    public Identifier getIdentifier() {
+        return identifier;
+    }
+
+    public int getSliceIndex() {
+        return sliceIndex;
+    }
+
+    public ActorRef getSendTo() {
+        return sendTo;
+    }
+
+    public Optional<MessageSliceException> getFailure() {
+        return Optional.ofNullable(failure);
+    }
+
+    @Override
+    public String toString() {
+        return "MessageSliceReply [identifier=" + identifier + ", sliceIndex=" + sliceIndex + ", failure=" + failure
+                + ", sendTo=" + sendTo + "]";
+    }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private MessageSliceReply messageSliceReply;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+        }
+
+        Proxy(MessageSliceReply messageSliceReply) {
+            this.messageSliceReply = messageSliceReply;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(messageSliceReply.identifier);
+            out.writeInt(messageSliceReply.sliceIndex);
+            out.writeObject(messageSliceReply.failure);
+            out.writeObject(Serialization.serializedActorPath(messageSliceReply.sendTo));
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            final Identifier identifier = (Identifier) in.readObject();
+            final int sliceIndex = in.readInt();
+            final MessageSliceException failure = (MessageSliceException) in.readObject();
+            ActorRef sendTo = JavaSerializer.currentSystem().value().provider()
+                    .resolveActorRef((String) in.readObject());
+
+            messageSliceReply = new MessageSliceReply(identifier, sliceIndex, failure, sendTo);
+        }
+
+        private Object readResolve() {
+            return messageSliceReply;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java
new file mode 100644 (file)
index 0000000..484a5c2
--- /dev/null
@@ -0,0 +1,374 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class slices messages into smaller chunks. {@link MessageAssembler} is used to re-assemble the messages.
+ *
+ * @author Thomas Pantelis
+ * @see MessageAssembler
+ */
+public class MessageSlicer implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
+    public static final int DEFAULT_MAX_SLICING_TRIES = 3;
+
+    private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
+    private final FileBackedOutputStreamFactory filedBackedStreamFactory;
+    private final int messageSliceSize;
+    private final int maxSlicingTries;
+    private final String logContext;
+
+    private MessageSlicer(Builder builder) {
+        this.filedBackedStreamFactory = builder.filedBackedStreamFactory;
+        this.messageSliceSize = builder.messageSliceSize;
+        this.maxSlicingTries = builder.maxSlicingTries;
+        this.logContext = builder.logContext;
+
+        CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
+                (RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
+        if (builder.expireStateAfterInactivityDuration > 0) {
+            cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration,
+                    builder.expireStateAfterInactivityUnit);
+        }
+
+        stateCache = cacheBuilder.build();
+    }
+
+    /**
+     * Returns a new Builder for creating MessageSlicer instances.
+     *
+     * @return a Builder instance
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Checks if the given message is handled by this class. If so, it should be forwarded to the
+     * {@link #handleMessage(Object)} method
+     *
+     * @param message the message to check
+     * @return true if handled, false otherwise
+     */
+    public static boolean isHandledMessage(Object message) {
+        return message instanceof MessageSliceReply;
+    }
+
+    /**
+     * Slices a message into chunks based on the serialized size, the maximum message slice size and the given
+     * options.
+     *
+     * @param options the SliceOptions
+     */
+    public void slice(SliceOptions options) {
+        final Identifier identifier = options.getIdentifier();
+        final Serializable message = options.getMessage();
+        final FileBackedOutputStream fileBackedStream;
+        if (message != null) {
+            LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
+
+
+            Preconditions.checkNotNull(filedBackedStreamFactory,
+                    "The FiledBackedStreamFactory must be set in order to call this slice method");
+
+            // Serialize the message to a FileBackedOutputStream.
+            fileBackedStream = filedBackedStreamFactory.newInstance();
+            try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
+                out.writeObject(message);
+            } catch (IOException e) {
+                LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
+                fileBackedStream.cleanup();
+                options.getOnFailureCallback().accept(e);
+                return;
+            }
+        } else {
+            fileBackedStream = options.getFileBackedStream();
+        }
+
+        initializeSlicing(options, fileBackedStream);
+    }
+
+    private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
+        final Identifier identifier = options.getIdentifier();
+        MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier);
+        SlicedMessageState<ActorRef> state = null;
+        try {
+            state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
+                    options.getReplyTo(), options.getOnFailureCallback(), logContext);
+
+            final Serializable message = options.getMessage();
+            if (state.getTotalSlices() == 1 && message != null) {
+                LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
+                state.close();
+                sendTo(options, message, options.getReplyTo());
+                return;
+            }
+
+            final MessageSlice firstSlice = getNextSliceMessage(state);
+
+            LOG.debug("{}: Sending first slice: {}", logContext, firstSlice);
+
+            stateCache.put(messageSliceId, state);
+            sendTo(options, firstSlice, ActorRef.noSender());
+        } catch (IOException e) {
+            LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
+            if (state != null) {
+                state.close();
+            } else {
+                fileBackedStream.cleanup();
+            }
+
+            options.getOnFailureCallback().accept(e);
+        }
+    }
+
+    private void sendTo(SliceOptions options, Object message, ActorRef sender) {
+        if (options.getSendToRef() != null) {
+            options.getSendToRef().tell(message, sender);
+        } else {
+            options.getSendToSelection().tell(message, sender);
+        }
+    }
+
+    /**
+     * Invoked to handle messages pertaining to this class.
+     *
+     * @param message the message
+     * @return true if the message was handled, false otherwise
+     */
+    public boolean handleMessage(final Object message) {
+        if (message instanceof MessageSliceReply) {
+            LOG.debug("{}: handleMessage: {}", logContext, message);
+            onMessageSliceReply((MessageSliceReply) message);
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Checks for and removes sliced message state that has expired due to inactivity from the assembling component
+     * on the other end.
+     */
+    public void checkExpiredSlicedMessageState() {
+        if (stateCache.size() > 0) {
+            stateCache.cleanUp();
+        }
+    }
+
+    /**
+     * Closes and removes all in-progress sliced message state.
+     */
+    @Override
+    public void close() {
+        LOG.debug("{}: Closing", logContext);
+        stateCache.invalidateAll();
+    }
+
+    private MessageSlice getNextSliceMessage(SlicedMessageState<ActorRef> state) throws IOException {
+        final byte[] firstSliceBytes = state.getNextSlice();
+        return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
+                state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
+    }
+
+    private void onMessageSliceReply(final MessageSliceReply reply) {
+        final Identifier identifier = reply.getIdentifier();
+        final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
+        if (state == null) {
+            LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
+            reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
+            return;
+        }
+
+        synchronized (state) {
+            try {
+                final Optional<MessageSliceException> failure = reply.getFailure();
+                if (failure.isPresent()) {
+                    LOG.warn("{}: Received failed {}", logContext, reply);
+                    processMessageSliceException(failure.get(), state, reply.getSendTo());
+                    return;
+                }
+
+                if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
+                    LOG.warn("{}: Slice index {} in {} does not match expected index {}", logContext,
+                            reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
+                    reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
+                    possiblyRetrySlicing(state, reply.getSendTo());
+                    return;
+                }
+
+                if (state.isLastSlice(reply.getSliceIndex())) {
+                    LOG.debug("{}: Received last slice reply for {}", logContext, identifier);
+                    removeState(identifier);
+                } else {
+                    final MessageSlice nextSlice = getNextSliceMessage(state);
+                    LOG.debug("{}: Sending next slice: {}", logContext, nextSlice);
+                    reply.getSendTo().tell(nextSlice, ActorRef.noSender());
+                }
+            } catch (IOException e) {
+                LOG.warn("{}: Error processing {}", logContext, reply, e);
+                fail(state, e);
+            }
+        }
+    }
+
+    private void processMessageSliceException(final MessageSliceException exception,
+            final SlicedMessageState<ActorRef> state, final ActorRef sendTo) throws IOException {
+        if (exception.isRetriable()) {
+            possiblyRetrySlicing(state, sendTo);
+        } else {
+            fail(state, exception.getCause() != null ? exception.getCause() : exception);
+        }
+    }
+
+    private void possiblyRetrySlicing(final SlicedMessageState<ActorRef> state, final ActorRef sendTo)
+            throws IOException {
+        if (state.canRetry()) {
+            LOG.info("{}: Retrying message slicing for {}", logContext, state.getIdentifier());
+            state.reset();
+            sendTo.tell(getNextSliceMessage(state), ActorRef.noSender());
+        } else {
+            String message = String.format("Maximum slicing retries reached for identifier %s - failing the message",
+                    state.getIdentifier());
+            LOG.warn(message);
+            fail(state, new RuntimeException(message));
+        }
+    }
+
+    private void removeState(final Identifier identifier) {
+        LOG.debug("{}: Removing state for {}", logContext, identifier);
+        stateCache.invalidate(identifier);
+    }
+
+    private void stateRemoved(RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
+        final SlicedMessageState<ActorRef> state = notification.getValue();
+        state.close();
+        if (notification.wasEvicted()) {
+            LOG.warn("{}: SlicedMessageState for {} was expired from the cache", logContext, notification.getKey());
+            state.getOnFailureCallback().accept(new RuntimeException(String.format(
+                    "The slicing state for message identifier %s was expired due to inactivity from the assembling "
+                     + "component on the other end", state.getIdentifier())));
+        } else {
+            LOG.debug("{}: SlicedMessageState for {} was removed from the cache due to {}", logContext,
+                    notification.getKey(), notification.getCause());
+        }
+    }
+
+    private void fail(final SlicedMessageState<ActorRef> state, final Throwable failure) {
+        removeState(state.getIdentifier());
+        state.getOnFailureCallback().accept(failure);
+    }
+
+    @VisibleForTesting
+    boolean hasState(Identifier forIdentifier) {
+        boolean exists = stateCache.getIfPresent(forIdentifier) != null;
+        stateCache.cleanUp();
+        return exists;
+    }
+
+    public static class Builder {
+        private FileBackedOutputStreamFactory filedBackedStreamFactory;
+        private int messageSliceSize = -1;
+        private long expireStateAfterInactivityDuration = -1;
+        private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
+        private int maxSlicingTries = DEFAULT_MAX_SLICING_TRIES;
+        private String logContext = "<no-context>";
+
+        /**
+         * Sets the factory for creating FileBackedOutputStream instances used for streaming messages. This factory
+         * is used by the {@link MessageSlicer#slice(SliceOptions)} method if a Serializable message is passed.
+         * If Serializable messages aren't passed then the factory need not be set.
+         *
+         * @param newFiledBackedStreamFactory the factory for creating FileBackedOutputStream instances
+         * @return this Builder
+         */
+        public Builder filedBackedStreamFactory(final FileBackedOutputStreamFactory newFiledBackedStreamFactory) {
+            this.filedBackedStreamFactory = Preconditions.checkNotNull(newFiledBackedStreamFactory);
+            return this;
+        }
+
+        /**
+         * Sets the maximum size (in bytes) for a message slice.
+         *
+         * @param newMessageSliceSize the maximum size (in bytes)
+         * @return this Builder
+         */
+        public Builder messageSliceSize(final int newMessageSliceSize) {
+            Preconditions.checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0");
+            this.messageSliceSize = newMessageSliceSize;
+            return this;
+        }
+
+        /**
+         * Sets the maximum number of tries for slicing a message. If exceeded, slicing fails. The default is
+         * defined by {@link #DEFAULT_MAX_SLICING_TRIES}
+         *
+         * @param newMaxSlicingTries the maximum number of tries
+         * @return this Builder
+         */
+        public Builder maxSlicingTries(final int newMaxSlicingTries) {
+            Preconditions.checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0");
+            this.maxSlicingTries = newMaxSlicingTries;
+            return this;
+        }
+
+        /**
+         * Sets the duration and time unit whereby sliced message state is purged from the cache and the associated
+         * failure callback is notified due to inactivity from the assembling component on the other end. By default,
+         * state is not purged due to inactivity.
+         *
+         * @param duration the length of time after which a state entry is purged
+         * @param unit the unit the duration is expressed in
+         * @return this Builder
+         */
+        public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
+            Preconditions.checkArgument(duration > 0, "duration must be > 0");
+            this.expireStateAfterInactivityDuration = duration;
+            this.expireStateAfterInactivityUnit = unit;
+            return this;
+        }
+
+        /**
+         * Sets the context for log messages.
+         *
+         * @param newLogContext the log context
+         * @return this Builder
+         */
+        public Builder logContext(final String newLogContext) {
+            this.logContext = Preconditions.checkNotNull(newLogContext);
+            return this;
+        }
+
+        /**
+         * Builds a new MessageSlicer instance.
+         *
+         * @return a new MessageSlicer
+         */
+        public MessageSlicer build() {
+            return new MessageSlicer(this);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/SliceOptions.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/SliceOptions.java
new file mode 100644 (file)
index 0000000..630a554
--- /dev/null
@@ -0,0 +1,196 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * Options for slicing a message with {@link MessageSlicer#slice(SliceOptions)}.
+ *
+ * @author Thomas Pantelis
+ */
+public class SliceOptions {
+    private final Builder builder;
+
+    private SliceOptions(Builder builder) {
+        this.builder = builder;
+    }
+
+    public Identifier getIdentifier() {
+        return builder.identifier;
+    }
+
+    public FileBackedOutputStream getFileBackedStream() {
+        return builder.fileBackedStream;
+    }
+
+    public Serializable getMessage() {
+        return builder.message;
+    }
+
+    public ActorRef getSendToRef() {
+        return builder.sendToRef;
+    }
+
+    public ActorSelection getSendToSelection() {
+        return builder.sendToSelection;
+    }
+
+    public ActorRef getReplyTo() {
+        return builder.replyTo;
+    }
+
+    public Consumer<Throwable> getOnFailureCallback() {
+        return builder.onFailureCallback;
+    }
+
+    /**
+     * Returns a new Builder for creating MessageSlicer instances.
+     *
+     * @return a Builder instance
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private Identifier identifier;
+        private FileBackedOutputStream fileBackedStream;
+        private Serializable message;
+        private ActorRef sendToRef;
+        private ActorSelection sendToSelection;
+        private ActorRef replyTo;
+        private Consumer<Throwable> onFailureCallback;
+        private boolean sealed;
+
+        /**
+         * Sets the identifier of the component to slice.
+         *
+         * @param newIdentifier the identifier
+         * @return this Builder
+         */
+        public Builder identifier(final Identifier newIdentifier) {
+            checkSealed();
+            identifier = newIdentifier;
+            return this;
+        }
+
+        /**
+         * Sets the {@link FileBackedOutputStream} containing the message data to slice.
+         *
+         * @param newFileBackedStream the {@link FileBackedOutputStream}
+         * @return this Builder
+         */
+        public Builder fileBackedOutputStream(final FileBackedOutputStream newFileBackedStream) {
+            checkSealed();
+            fileBackedStream = newFileBackedStream;
+            return this;
+        }
+
+        /**
+         * Sets the message to slice. The message is first serialized to a {@link FileBackedOutputStream}. If the
+         * message doesn't need to be sliced, ie its serialized size is less than the maximum message slice size, then
+         * the original message is sent. Otherwise the first message slice is sent.
+         *
+         * <p>
+         * <b>Note:</b> a {@link FileBackedOutputStreamFactory} must be set in the {@link MessageSlicer}.
+         *
+         * @param newMessage the message
+         * @param <T> the Serializable message type
+         * @return this Builder
+         */
+        public <T extends Serializable> Builder message(final T newMessage) {
+            checkSealed();
+            message = newMessage;
+            return this;
+        }
+
+        /**
+         * Sets the reference of the actor to which to send the message slices.
+         *
+         * @param sendTo the ActorRef
+         * @return this Builder
+         */
+        public Builder sendTo(final ActorRef sendTo) {
+            checkSealed();
+            sendToRef = sendTo;
+            return this;
+        }
+
+        /**
+         * Sets the ActorSelection to which to send the message slices.
+         *
+         * @param sendTo the ActorSelection
+         * @return this Builder
+         */
+        public Builder sendTo(final ActorSelection sendTo) {
+            checkSealed();
+            sendToSelection = sendTo;
+            return this;
+        }
+
+        /**
+         * Sets the reference of the actor to which message slice replies should be sent. The actor should
+         * forward the replies to the {@link MessageSlicer#handleMessage(Object)} method.
+         *
+         * @param newReplyTo the ActorRef
+         * @return this Builder
+         */
+        public Builder replyTo(final ActorRef newReplyTo) {
+            checkSealed();
+            replyTo = newReplyTo;
+            return this;
+        }
+
+        /**
+         * Sets the callback to be notified of failure.
+         *
+         * @param newOnFailureCallback the callback
+         * @return this Builder
+         */
+        public Builder onFailureCallback(final Consumer<Throwable> newOnFailureCallback) {
+            checkSealed();
+            onFailureCallback = newOnFailureCallback;
+            return this;
+        }
+
+        /**
+         * Builds a new SliceOptions instance.
+         *
+         * @return a new SliceOptions
+         */
+        public SliceOptions build() {
+            sealed = true;
+
+            Preconditions.checkNotNull(identifier, "identifier must be set");
+            Preconditions.checkNotNull(replyTo, "replyTo must be set");
+            Preconditions.checkNotNull(onFailureCallback, "onFailureCallback must be set");
+            Preconditions.checkState(fileBackedStream == null || message == null,
+                    "Only one of message and fileBackedStream can be set");
+            Preconditions.checkState(!(fileBackedStream == null && message == null),
+                    "One of message and fileBackedStream must be set");
+            Preconditions.checkState(sendToRef == null || sendToSelection == null,
+                    "Only one of sendToRef and sendToSelection can be set");
+            Preconditions.checkState(!(sendToRef == null && sendToSelection == null),
+                    "One of sendToRef and sendToSelection must be set");
+
+            return new SliceOptions(this);
+        }
+
+        protected void checkSealed() {
+            Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/SlicedMessageState.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/SlicedMessageState.java
new file mode 100644 (file)
index 0000000..8c3cb51
--- /dev/null
@@ -0,0 +1,239 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains the state of a sliced message.
+ *
+ * @author Thomas Pantelis
+ * @see MessageSlicer
+ */
+@NotThreadSafe
+public class SlicedMessageState<T> implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(SlicedMessageState.class);
+
+    // The index of the first slice that is sent.
+    static final int FIRST_SLICE_INDEX = 1;
+
+    // The initial hash code for a slice.
+    static final int INITIAL_SLICE_HASH_CODE = -1;
+
+    private final Identifier identifier;
+    private final int messageSliceSize;
+    private final FileBackedOutputStream fileBackedStream;
+    private final T replyTarget;
+    private final ByteSource messageBytes;
+    private final int totalSlices;
+    private final long totalMessageSize;
+    private final int maxRetries;
+    private final Consumer<Throwable> onFailureCallback;
+    private final String logContext;
+
+    private int currentByteOffset = 0;
+    private int currentSliceIndex = FIRST_SLICE_INDEX - 1;
+    private int lastSliceHashCode = INITIAL_SLICE_HASH_CODE;
+    private int currentSliceHashCode = INITIAL_SLICE_HASH_CODE;
+    private int tryCount = 1;
+    private InputStream messageInputStream;
+
+    /**
+     * Constructor.
+     *
+     * @param identifier the identifier for this instance
+     * @param fileBackedStream the FileBackedOutputStream containing the serialized data to slice
+     * @param messageSliceSize the maximum size (in bytes) for a message slice
+     * @param maxRetries the maximum number of retries
+     * @param replyTarget the user-defined target for sliced message replies
+     * @param onFailureCallback the callback to notify on failure
+     * @param logContext the context for log messages
+     * @throws IOException if an error occurs opening the input stream
+     */
+    public SlicedMessageState(final Identifier identifier, final FileBackedOutputStream fileBackedStream,
+            final int messageSliceSize, final int maxRetries, final T replyTarget,
+            final Consumer<Throwable> onFailureCallback, final String logContext) throws IOException {
+        this.identifier = identifier;
+        this.fileBackedStream = fileBackedStream;
+        this.messageSliceSize = messageSliceSize;
+        this.maxRetries = maxRetries;
+        this.replyTarget = replyTarget;
+        this.onFailureCallback = onFailureCallback;
+        this.logContext = logContext;
+
+        messageBytes = fileBackedStream.asByteSource();
+        totalMessageSize = messageBytes.size();
+        messageInputStream = messageBytes.openStream();
+
+        totalSlices = (int)(totalMessageSize / messageSliceSize + (totalMessageSize % messageSliceSize > 0 ? 1 : 0));
+
+        LOG.debug("{}: Message size: {} bytes, total slices to send: {}", logContext, totalMessageSize, totalSlices);
+    }
+
+    /**
+     * Returns the current slice index that has been sent.
+     *
+     * @return the current slice index that has been sent
+     */
+    public int getCurrentSliceIndex() {
+        return currentSliceIndex;
+    }
+
+    /**
+     * Returns the hash code of the last slice that was sent.
+     *
+     * @return the hash code of the last slice that was sent
+     */
+    public int getLastSliceHashCode() {
+        return lastSliceHashCode;
+    }
+
+    /**
+     * Returns the total number of slices to send.
+     *
+     * @return the total number of slices to send
+     */
+    public int getTotalSlices() {
+        return totalSlices;
+    }
+
+    /**
+     * Returns the identifier of this instance.
+     *
+     * @return the identifier
+     */
+    public Identifier getIdentifier() {
+        return identifier;
+    }
+
+    /**
+     * Returns the user-defined target for sliced message replies.
+     *
+     * @return the user-defined target
+     */
+    public T getReplyTarget() {
+        return replyTarget;
+    }
+
+    /**
+     *  Returns the callback to notify on failure.
+     *
+     * @return the callback to notify on failure
+     */
+    public Consumer<Throwable> getOnFailureCallback() {
+        return onFailureCallback;
+    }
+
+    /**
+     * Determines if the slicing can be retried.
+     *
+     * @return true if the slicing can be retried, false if the maximum number of retries has been reached
+     */
+    public boolean canRetry() {
+        return tryCount <= maxRetries;
+    }
+
+    /**
+     * Determines if the given index is the last slice to send.
+     *
+     * @param index the slice index to test
+     * @return true if the index is the last slice, false otherwise
+     */
+    public boolean isLastSlice(int index) {
+        return totalSlices == index;
+    }
+
+    /**
+     * Reads and returns the next slice of data.
+     *
+     * @return the next slice of data as a byte[]
+     * @throws IOException if an error occurs reading the data
+     */
+    public byte[] getNextSlice() throws IOException {
+        currentSliceIndex++;
+        final int start;
+        if (currentSliceIndex == FIRST_SLICE_INDEX) {
+            start = 0;
+        } else {
+            start = incrementByteOffset();
+        }
+
+        final int size;
+        if (messageSliceSize > totalMessageSize) {
+            size = (int) totalMessageSize;
+        } else if (start + messageSliceSize > totalMessageSize) {
+            size = (int) (totalMessageSize - start);
+        } else {
+            size = messageSliceSize;
+        }
+
+        LOG.debug("{}: getNextSlice: total size: {}, offset: {}, size: {}, index: {}", logContext, totalMessageSize,
+                start, size, currentSliceIndex);
+
+        byte[] nextSlice = new byte[size];
+        int numRead = messageInputStream.read(nextSlice);
+        if (numRead != size) {
+            throw new IOException(String.format(
+                    "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size));
+        }
+
+        lastSliceHashCode = currentSliceHashCode;
+        currentSliceHashCode = Arrays.hashCode(nextSlice);
+
+        return nextSlice;
+    }
+
+    /**
+     * Resets this instance to restart slicing from the beginning.
+     *
+     * @throws IOException if an error occurs resetting the input stream
+     */
+    public void reset() throws IOException {
+        closeStream();
+
+        tryCount++;
+        currentByteOffset = 0;
+        currentSliceIndex = FIRST_SLICE_INDEX - 1;
+        lastSliceHashCode = INITIAL_SLICE_HASH_CODE;
+        currentSliceHashCode = INITIAL_SLICE_HASH_CODE;
+
+        messageInputStream = messageBytes.openStream();
+    }
+
+    private int incrementByteOffset() {
+        currentByteOffset  += messageSliceSize;
+        return currentByteOffset;
+    }
+
+    private void closeStream() {
+        if (messageInputStream != null) {
+            try {
+                messageInputStream.close();
+            } catch (IOException e) {
+                LOG.warn("{}: Error closing message stream", logContext, e);
+            }
+
+            messageInputStream = null;
+        }
+    }
+
+    @Override
+    public void close() {
+        closeStream();
+        fileBackedStream.cleanup();
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbortSlicingTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbortSlicingTest.java
new file mode 100644 (file)
index 0000000..4441857
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for AbortSlicing.
+ *
+ * @author Thomas Pantelis
+ */
+public class AbortSlicingTest {
+
+    @Test
+    public void testSerialization() {
+        AbortSlicing expected = new AbortSlicing(new StringIdentifier("test"));
+        AbortSlicing cloned = (AbortSlicing) SerializationUtils.clone(expected);
+        assertEquals("getIdentifier", expected.getIdentifier(), cloned.getIdentifier());
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbstractMessagingTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbstractMessagingTest.java
new file mode 100644 (file)
index 0000000..3d93755
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.InputStream;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+
+/**
+ * Abstract base class for messaging tests.
+ *
+ * @author Thomas Pantelis
+ */
+public class AbstractMessagingTest {
+    protected static final StringIdentifier IDENTIFIER = new StringIdentifier("test");
+    protected static ActorSystem ACTOR_SYSTEM;
+
+    protected final TestProbe testProbe = TestProbe.apply(ACTOR_SYSTEM);
+
+    @Mock
+    protected FileBackedOutputStreamFactory mockFiledBackedStreamFactory;
+
+    @Mock
+    protected FileBackedOutputStream mockFiledBackedStream;
+
+    @Mock
+    protected ByteSource mockByteSource;
+
+    @Mock
+    protected InputStream mockInputStream;
+
+    @BeforeClass
+    public static void setupClass() throws IOException {
+        ACTOR_SYSTEM = ActorSystem.create("test");
+    }
+
+    @Before
+    public void setup() throws IOException {
+        MockitoAnnotations.initMocks(this);
+
+        doReturn(mockFiledBackedStream).when(mockFiledBackedStreamFactory).newInstance();
+        doNothing().when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt());
+        doNothing().when(mockFiledBackedStream).write(any(byte[].class));
+        doNothing().when(mockFiledBackedStream).write(anyInt());
+        doNothing().when(mockFiledBackedStream).close();
+        doNothing().when(mockFiledBackedStream).cleanup();
+        doNothing().when(mockFiledBackedStream).flush();
+        doReturn(mockByteSource).when(mockFiledBackedStream).asByteSource();
+
+        doReturn(mockInputStream).when(mockByteSource).openStream();
+        doReturn(mockInputStream).when(mockByteSource).openBufferedStream();
+        doReturn(10L).when(mockByteSource).size();
+
+        doReturn(0).when(mockInputStream).read(any(byte[].class));
+    }
+
+    @AfterClass
+    public static void tearDownClass() {
+        JavaTestKit.shutdownActorSystem(ACTOR_SYSTEM, Boolean.TRUE);
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/BytesMessage.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/BytesMessage.java
new file mode 100644 (file)
index 0000000..1805bed
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * Serializable message that stores a byte[].
+ *
+ * @author Thomas Pantelis
+ */
+public class BytesMessage implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final byte[] bytes;
+
+    public BytesMessage(byte[] bytes) {
+        this.bytes = Arrays.copyOf(bytes, bytes.length);
+    }
+
+    @Override
+    public int hashCode() {
+        return Arrays.hashCode(bytes);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        BytesMessage other = (BytesMessage) obj;
+        return Arrays.equals(bytes, other.bytes);
+    }
+
+    @Override
+    public String toString() {
+        return "BytesMessage [bytes=" + Arrays.toString(bytes) + "]";
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageAssemblerTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageAssemblerTest.java
new file mode 100644 (file)
index 0000000..e3a68ee
--- /dev/null
@@ -0,0 +1,184 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.messaging.MessageSlicingIntegrationTest.assertAssembledMessage;
+import static org.opendaylight.controller.cluster.messaging.MessageSlicingIntegrationTest.assertFailedMessageSliceReply;
+import static org.opendaylight.controller.cluster.messaging.MessageSlicingIntegrationTest.assertSuccessfulMessageSliceReply;
+
+import akka.actor.ActorRef;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler.Builder;
+
+/**
+ * Unit tests for MessageAssembler.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageAssemblerTest extends AbstractMessagingTest {
+
+    @Mock
+    private BiConsumer<Object, ActorRef> mockAssembledMessageCallback;
+
+    @Override
+    @Before
+    public void setup() throws IOException {
+        super.setup();
+
+        doNothing().when(mockAssembledMessageCallback).accept(any(Object.class), any(ActorRef.class));
+    }
+
+    @Test
+    public void testHandledMessages() {
+        final MessageSlice messageSlice = new MessageSlice(IDENTIFIER, new byte[0], 1, 1, 1, testProbe.ref());
+        final AbortSlicing abortSlicing = new AbortSlicing(IDENTIFIER);
+        assertEquals("isHandledMessage", Boolean.TRUE, MessageAssembler.isHandledMessage(messageSlice));
+        assertEquals("isHandledMessage", Boolean.TRUE, MessageAssembler.isHandledMessage(abortSlicing));
+        assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
+
+        try (MessageAssembler assembler = newMessageAssembler("testHandledMessages")) {
+            assertEquals("handledMessage", Boolean.TRUE, assembler.handleMessage(messageSlice, testProbe.ref()));
+            assertEquals("handledMessage", Boolean.TRUE, assembler.handleMessage(abortSlicing, testProbe.ref()));
+            assertEquals("handledMessage", Boolean.FALSE, assembler.handleMessage(new Object(), testProbe.ref()));
+        }
+    }
+
+    @Test
+    public void testSingleMessageSlice() {
+        try (MessageAssembler assembler = newMessageAssembler("testSingleMessageSlice")) {
+            final FileBackedOutputStream fileBackStream = spy(new FileBackedOutputStream(100000000, null));
+            doReturn(fileBackStream).when(mockFiledBackedStreamFactory).newInstance();
+
+            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+            final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+
+            final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
+                    SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref());
+            assembler.handleMessage(messageSlice, testProbe.ref());
+
+            final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
+            assertSuccessfulMessageSliceReply(reply, IDENTIFIER, 1);
+
+            assertAssembledMessage(mockAssembledMessageCallback, message, testProbe.ref());
+
+            assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier));
+            verify(fileBackStream).cleanup();
+        }
+    }
+
+    @Test
+    public void testMessageSliceWithByteSourceFailure() throws IOException {
+        try (MessageAssembler assembler = newMessageAssembler("testMessageSliceWithByteSourceFailure")) {
+            IOException mockFailure = new IOException("mock IOException");
+            doThrow(mockFailure).when(mockByteSource).openStream();
+            doThrow(mockFailure).when(mockByteSource).openBufferedStream();
+
+            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+            final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+
+            final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
+                    SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref());
+            assembler.handleMessage(messageSlice, testProbe.ref());
+
+            final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
+            assertFailedMessageSliceReply(reply, IDENTIFIER, false);
+            assertEquals("Failure cause", mockFailure, reply.getFailure().get().getCause());
+
+            assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier));
+            verify(mockFiledBackedStream).cleanup();
+        }
+    }
+
+    @Test
+    public void testMessageSliceWithStreamWriteFailure() throws IOException {
+        try (MessageAssembler assembler = newMessageAssembler("testMessageSliceWithStreamWriteFailure")) {
+            IOException mockFailure = new IOException("mock IOException");
+            doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt());
+            doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class));
+            doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt());
+            doThrow(mockFailure).when(mockFiledBackedStream).flush();
+
+            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+            final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+
+            final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
+                    SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref());
+            assembler.handleMessage(messageSlice, testProbe.ref());
+
+            final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
+            assertFailedMessageSliceReply(reply, IDENTIFIER, false);
+            assertEquals("Failure cause", mockFailure, reply.getFailure().get().getCause());
+
+            assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier));
+            verify(mockFiledBackedStream).cleanup();
+        }
+    }
+
+    @Test
+    public void testAssembledMessageStateExpiration() throws IOException {
+        final int expiryDuration = 200;
+        try (MessageAssembler assembler = newMessageAssemblerBuilder("testAssembledMessageStateExpiration")
+                .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
+            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+            final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+
+            final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 2,
+                    SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref());
+            assembler.handleMessage(messageSlice, testProbe.ref());
+
+            final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
+            assertSuccessfulMessageSliceReply(reply, IDENTIFIER, 1);
+
+            assertTrue("MessageAssembler should have remove state for " + identifier, assembler.hasState(identifier));
+            Uninterruptibles.sleepUninterruptibly(expiryDuration + 50, TimeUnit.MILLISECONDS);
+            assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier));
+
+            verify(mockFiledBackedStream).cleanup();
+        }
+    }
+
+    @Test
+    public void testFirstMessageSliceWithInvalidIndex() {
+        try (MessageAssembler assembler = newMessageAssembler("testFirstMessageSliceWithInvalidIndex")) {
+            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+            final MessageSlice messageSlice = new MessageSlice(identifier, new byte[0], 2, 3, 1, testProbe.ref());
+            assembler.handleMessage(messageSlice, testProbe.ref());
+
+            final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
+            assertFailedMessageSliceReply(reply, IDENTIFIER, true);
+            assertFalse("MessageAssembler should not have state for " + identifier, assembler.hasState(identifier));
+        }
+    }
+
+    private MessageAssembler newMessageAssembler(String logContext) {
+        return newMessageAssemblerBuilder(logContext).build();
+    }
+
+    private Builder newMessageAssemblerBuilder(String logContext) {
+        return MessageAssembler.builder().filedBackedStreamFactory(mockFiledBackedStreamFactory)
+                .assembledMessageCallback(mockAssembledMessageCallback).logContext(logContext);
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifierTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifierTest.java
new file mode 100644 (file)
index 0000000..addfc53
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for MessageSliceIdentifier.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSliceIdentifierTest {
+
+    @Test
+    public void testSerialization() {
+        MessageSliceIdentifier expected = new MessageSliceIdentifier(new StringIdentifier("test"));
+        MessageSliceIdentifier cloned = (MessageSliceIdentifier) SerializationUtils.clone(expected);
+        assertEquals("cloned", expected, cloned);
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceReplyTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceReplyTest.java
new file mode 100644 (file)
index 0000000..fa31fbf
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import static org.junit.Assert.assertEquals;
+
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.serialization.JavaSerializer;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit tests for MessageSliceReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSliceReplyTest {
+    private final ActorSystem actorSystem = ActorSystem.create("test");
+
+    @Before
+    public void setUp() {
+        JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) actorSystem);
+    }
+
+    @After
+    public void tearDown() {
+        JavaTestKit.shutdownActorSystem(actorSystem, Boolean.TRUE);
+    }
+
+    @Test
+    public void testSerialization() {
+        testSuccess();
+        testFailure();
+    }
+
+    private void testSuccess() {
+        MessageSliceReply expected = MessageSliceReply.success(new StringIdentifier("test"), 3,
+                TestProbe.apply(actorSystem).ref());
+        MessageSliceReply cloned = (MessageSliceReply) SerializationUtils.clone(expected);
+
+        assertEquals("getIdentifier", expected.getIdentifier(), cloned.getIdentifier());
+        assertEquals("getSliceIndex", expected.getSliceIndex(), cloned.getSliceIndex());
+        assertEquals("getSendTo", expected.getSendTo(), cloned.getSendTo());
+        assertEquals("getFailure present", Boolean.FALSE, cloned.getFailure().isPresent());
+    }
+
+    private void testFailure() {
+        MessageSliceReply expected = MessageSliceReply.failed(new StringIdentifier("test"),
+                new MessageSliceException("mock", true), TestProbe.apply(actorSystem).ref());
+        MessageSliceReply cloned = (MessageSliceReply) SerializationUtils.clone(expected);
+
+        assertEquals("getIdentifier", expected.getIdentifier(), cloned.getIdentifier());
+        assertEquals("getSliceIndex", expected.getSliceIndex(), cloned.getSliceIndex());
+        assertEquals("getSendTo", expected.getSendTo(), cloned.getSendTo());
+        assertEquals("getFailure present", Boolean.TRUE, cloned.getFailure().isPresent());
+        assertEquals("getFailure message", expected.getFailure().get().getMessage(),
+                cloned.getFailure().get().getMessage());
+        assertEquals("getFailure isRetriable", expected.getFailure().get().isRetriable(),
+                cloned.getFailure().get().isRetriable());
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceTest.java
new file mode 100644 (file)
index 0000000..6331e0b
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.serialization.JavaSerializer;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit tests for MessageSlice.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSliceTest {
+    private final ActorSystem actorSystem = ActorSystem.create("test");
+
+    @Before
+    public void setUp() {
+        JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) actorSystem);
+    }
+
+    @After
+    public void tearDown() {
+        JavaTestKit.shutdownActorSystem(actorSystem, Boolean.TRUE);
+    }
+
+    @Test
+    public void testSerialization() {
+        byte[] data = new byte[1000];
+        for (int i = 0, j = 0; i < data.length; i++) {
+            data[i] = (byte)j;
+            if (++j >= 255) {
+                j = 0;
+            }
+        }
+
+        MessageSlice expected = new MessageSlice(new StringIdentifier("test"), data, 2, 3, 54321,
+                TestProbe.apply(actorSystem).ref());
+        MessageSlice cloned = (MessageSlice) SerializationUtils.clone(expected);
+
+        assertEquals("getIdentifier", expected.getIdentifier(), cloned.getIdentifier());
+        assertEquals("getSliceIndex", expected.getSliceIndex(), cloned.getSliceIndex());
+        assertEquals("getTotalSlices", expected.getTotalSlices(), cloned.getTotalSlices());
+        assertEquals("getLastSliceHashCode", expected.getLastSliceHashCode(), cloned.getLastSliceHashCode());
+        assertArrayEquals("getData", expected.getData(), cloned.getData());
+        assertEquals("getReplyTo", expected.getReplyTo(), cloned.getReplyTo());
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java
new file mode 100644 (file)
index 0000000..20b6cab
--- /dev/null
@@ -0,0 +1,164 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import akka.actor.ActorRef;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * Unit tests for MessageSlicer.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSlicerTest extends AbstractMessagingTest {
+    @Mock
+    private Consumer<Throwable> mockOnFailureCallback;
+
+    @Override
+    @Before
+    public void setup() throws IOException {
+        super.setup();
+
+        doNothing().when(mockOnFailureCallback).accept(any(Throwable.class));
+    }
+
+    @Test
+    public void testHandledMessages() {
+        final MessageSliceReply reply = MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref());
+        assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
+        assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
+
+        try (MessageSlicer slicer = newMessageSlicer("testHandledMessages", 100)) {
+            assertEquals("handledMessage", Boolean.TRUE, slicer.handleMessage(reply));
+            assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(new Object()));
+        }
+    }
+
+    @Test
+    public void testSliceWithFailedSerialization() throws IOException {
+        IOException mockFailure = new IOException("mock IOException");
+        doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt());
+        doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class));
+        doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt());
+        doThrow(mockFailure).when(mockFiledBackedStream).flush();
+
+        try (MessageSlicer slicer = newMessageSlicer("testSliceWithFailedSerialization", 100)) {
+            slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
+                    mockOnFailureCallback);
+
+            assertFailureCallback(IOException.class);
+            verify(mockFiledBackedStream).cleanup();
+        }
+    }
+
+    @Test
+    public void testSliceWithByteSourceFailure() throws IOException {
+        IOException mockFailure = new IOException("mock IOException");
+        doThrow(mockFailure).when(mockByteSource).openStream();
+        doThrow(mockFailure).when(mockByteSource).openBufferedStream();
+
+        try (MessageSlicer slicer = newMessageSlicer("testSliceWithByteSourceFailure", 100)) {
+            slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
+                    mockOnFailureCallback);
+
+            assertFailureCallback(IOException.class);
+            verify(mockFiledBackedStream).cleanup();
+        }
+    }
+
+    @Test
+    public void testSliceWithInputStreamFailure() throws IOException {
+        doReturn(0).when(mockInputStream).read(any(byte[].class));
+
+        try (MessageSlicer slicer = newMessageSlicer("testSliceWithInputStreamFailure", 2)) {
+            slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
+                    mockOnFailureCallback);
+
+            assertFailureCallback(IOException.class);
+            verify(mockFiledBackedStream).cleanup();
+        }
+    }
+
+    @Test
+    public void testMessageSliceReplyWithNoState() {
+        try (MessageSlicer slicer = newMessageSlicer("testMessageSliceReplyWithNoState", 1000)) {
+            slicer.handleMessage(MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref()));
+            final AbortSlicing abortSlicing = testProbe.expectMsgClass(AbortSlicing.class);
+            assertEquals("Identifier", IDENTIFIER, abortSlicing.getIdentifier());
+        }
+    }
+
+    @Test
+    public void testCloseAllSlicedMessageState() throws IOException {
+        doReturn(1).when(mockInputStream).read(any(byte[].class));
+
+        final MessageSlicer slicer = newMessageSlicer("testCloseAllSlicedMessageState", 1);
+        slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(),
+                mockOnFailureCallback);
+
+        slicer.close();
+
+        verify(mockFiledBackedStream).cleanup();
+        verifyNoMoreInteractions(mockOnFailureCallback);
+    }
+
+    @Test
+    public void testCheckExpiredSlicedMessageState() throws IOException {
+        doReturn(1).when(mockInputStream).read(any(byte[].class));
+
+        final int expiryDuration = 200;
+        try (MessageSlicer slicer = MessageSlicer.builder().messageSliceSize(1)
+                .logContext("testCheckExpiredSlicedMessageState")
+                .filedBackedStreamFactory(mockFiledBackedStreamFactory)
+                .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
+            slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(),
+                    mockOnFailureCallback);
+
+            Uninterruptibles.sleepUninterruptibly(expiryDuration + 50, TimeUnit.MILLISECONDS);
+            slicer.checkExpiredSlicedMessageState();
+
+            assertFailureCallback(RuntimeException.class);
+            verify(mockFiledBackedStream).cleanup();
+        }
+    }
+
+    private void assertFailureCallback(final Class<?> exceptionType) {
+        ArgumentCaptor<Throwable> exceptionCaptor = ArgumentCaptor.forClass(Throwable.class);
+        verify(mockOnFailureCallback).accept(exceptionCaptor.capture());
+        assertEquals("Exception type", exceptionType, exceptionCaptor.getValue().getClass());
+    }
+
+    private MessageSlicer newMessageSlicer(String logContext, final int messageSliceSize) {
+        return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext)
+                .filedBackedStreamFactory(mockFiledBackedStreamFactory).build();
+    }
+
+    static void slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo,
+            ActorRef replyTo, Consumer<Throwable> onFailureCallback) {
+        slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo).replyTo(replyTo)
+                .onFailureCallback(onFailureCallback).build());
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicingIntegrationTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicingIntegrationTest.java
new file mode 100644 (file)
index 0000000..a490a24
--- /dev/null
@@ -0,0 +1,341 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.messaging.MessageSlicerTest.slice;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * End-to-end integration tests for message slicing.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSlicingIntegrationTest {
+    private static final Logger LOG = LoggerFactory.getLogger(MessageSlicingIntegrationTest.class);
+
+    private static final ActorSystem ACTOR_SYSTEM = ActorSystem.create("test");
+    private static final FileBackedOutputStreamFactory FILE_BACKED_STREAM_FACTORY =
+            new FileBackedOutputStreamFactory(1000000000, "target");
+    private static final Identifier IDENTIFIER = new StringIdentifier("stringId");
+    private static final int DONT_CARE = -1;
+
+    private final TestProbe sendToProbe = TestProbe.apply(ACTOR_SYSTEM);
+    private final TestProbe replyToProbe = TestProbe.apply(ACTOR_SYSTEM);
+
+    @SuppressWarnings("unchecked")
+    private final Consumer<Throwable> mockOnFailureCallback = mock(Consumer.class);
+
+    @SuppressWarnings("unchecked")
+    private final BiConsumer<Object, ActorRef> mockAssembledMessageCallback = mock(BiConsumer.class);
+
+    private final MessageAssembler assembler = MessageAssembler.builder()
+            .assembledMessageCallback(mockAssembledMessageCallback).logContext("test")
+            .filedBackedStreamFactory(FILE_BACKED_STREAM_FACTORY).build();
+
+    @Before
+    public void setup() {
+        doNothing().when(mockOnFailureCallback).accept(any(Throwable.class));
+        doNothing().when(mockAssembledMessageCallback).accept(any(Object.class), any(ActorRef.class));
+    }
+
+    @After
+    public void tearDown() {
+        assembler.close();
+    }
+
+    @AfterClass
+    public static void staticTearDown() {
+        JavaTestKit.shutdownActorSystem(ACTOR_SYSTEM, Boolean.TRUE);
+    }
+
+    @Test
+    public void testSlicingWithChunks() throws IOException {
+        LOG.info("testSlicingWithChunks starting");
+
+        // First slice a message where the messageSliceSize divides evenly into the serialized size.
+
+        byte[] emptyMessageBytes = SerializationUtils.serialize(new BytesMessage(new byte[]{}));
+        int messageSliceSize = 10;
+        int expTotalSlices = emptyMessageBytes.length / messageSliceSize;
+        ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+        if (emptyMessageBytes.length % messageSliceSize > 0) {
+            expTotalSlices++;
+            int padding = messageSliceSize - emptyMessageBytes.length % messageSliceSize;
+            byte value = 1;
+            for (int i = 0; i < padding; i++, value++) {
+                byteStream.write(value);
+            }
+        }
+
+        testSlicing("testSlicingWithChunks", messageSliceSize, expTotalSlices, byteStream.toByteArray());
+
+        // Now slice a message where the messageSliceSize doesn't divide evenly.
+
+        byteStream.write(new byte[]{100, 101, 102});
+        testSlicing("testSlicingWithChunks", messageSliceSize, expTotalSlices + 1, byteStream.toByteArray());
+
+        LOG.info("testSlicingWithChunks ending");
+    }
+
+    @Test
+    public void testSingleSlice() {
+        LOG.info("testSingleSlice starting");
+
+        // Slice a message where the serialized size is equal to the messageSliceSize. In this case it should
+        // just send the original message.
+
+        final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+        try (MessageSlicer slicer = newMessageSlicer("testSingleSlice", SerializationUtils.serialize(message).length)) {
+            slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
+
+            final BytesMessage sentMessage = sendToProbe.expectMsgClass(BytesMessage.class);
+            assertEquals("Sent message", message, sentMessage);
+        }
+
+        LOG.info("testSingleSlice ending");
+    }
+
+    @Test
+    public void testSlicingWithRetry() {
+        LOG.info("testSlicingWithRetry starting");
+
+        final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+        final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
+        try (MessageSlicer slicer = newMessageSlicer("testSlicingWithRetry", messageSliceSize)) {
+            slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
+
+            MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
+            assembler.handleMessage(sliceMessage, sendToProbe.ref());
+
+            // Swallow the reply and send the MessageSlice again - it should return a failed reply.
+            replyToProbe.expectMsgClass(MessageSliceReply.class);
+            assembler.handleMessage(sliceMessage, sendToProbe.ref());
+
+            final MessageSliceReply failedReply = replyToProbe.expectMsgClass(MessageSliceReply.class);
+            assertFailedMessageSliceReply(failedReply, IDENTIFIER, true);
+
+            // Send the failed reply - slicing should be retried from the beginning.
+
+            slicer.handleMessage(failedReply);
+            while (true) {
+                sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
+                assembler.handleMessage(sliceMessage, sendToProbe.ref());
+
+                final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
+                assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceMessage.getSliceIndex());
+                slicer.handleMessage(reply);
+
+                if (reply.getSliceIndex() == sliceMessage.getTotalSlices()) {
+                    break;
+                }
+            }
+
+            assertAssembledMessage(message, replyToProbe.ref());
+        }
+
+        LOG.info("testSlicingWithRetry ending");
+    }
+
+    @Test
+    public void testSlicingWithMaxRetriesReached() {
+        LOG.info("testSlicingWithMaxRetriesReached starting");
+
+        final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+        final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
+        try (MessageSlicer slicer = newMessageSlicer("testSlicingWithMaxRetriesReached", messageSliceSize)) {
+            slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
+
+            Identifier slicingId = null;
+            for (int i = 0; i < MessageSlicer.DEFAULT_MAX_SLICING_TRIES; i++) {
+                MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
+                slicingId = sliceMessage.getIdentifier();
+                assertMessageSlice(sliceMessage, IDENTIFIER, 1, DONT_CARE, SlicedMessageState.INITIAL_SLICE_HASH_CODE,
+                        replyToProbe.ref());
+                assembler.handleMessage(sliceMessage, sendToProbe.ref());
+
+                // Swallow the reply and send the MessageSlicer a reply with an invalid index.
+                final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
+                assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceMessage.getSliceIndex());
+                slicer.handleMessage(MessageSliceReply.success(reply.getIdentifier(), 100000, reply.getSendTo()));
+
+                final AbortSlicing abortSlicing = sendToProbe.expectMsgClass(AbortSlicing.class);
+                assertEquals("Identifier", slicingId, abortSlicing.getIdentifier());
+                assembler.handleMessage(abortSlicing, sendToProbe.ref());
+            }
+
+            slicer.handleMessage(MessageSliceReply.success(slicingId, 100000, sendToProbe.ref()));
+
+            assertFailureCallback(RuntimeException.class);
+
+            assertFalse("MessageSlicer did not remove state for " + slicingId, slicer.hasState(slicingId));
+            assertFalse("MessageAssembler did not remove state for " + slicingId, assembler.hasState(slicingId));
+        }
+
+        LOG.info("testSlicingWithMaxRetriesReached ending");
+    }
+
+    @Test
+    public void testSlicingWithFailure() {
+        LOG.info("testSlicingWithFailure starting");
+
+        final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+        final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
+        try (MessageSlicer slicer = newMessageSlicer("testSlicingWithFailure", messageSliceSize)) {
+            slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
+
+            MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
+
+            MessageSliceException failure = new MessageSliceException("mock failure",
+                    new IOException("mock IOException"));
+            slicer.handleMessage(MessageSliceReply.failed(sliceMessage.getIdentifier(), failure, sendToProbe.ref()));
+
+            assertFailureCallback(IOException.class);
+
+            assertFalse("MessageSlicer did not remove state for " + sliceMessage.getIdentifier(),
+                    slicer.hasState(sliceMessage.getIdentifier()));
+        }
+
+        LOG.info("testSlicingWithFailure ending");
+    }
+
+    @Test
+    public void testSliceWithFileBackedOutputStream() throws IOException {
+        LOG.info("testSliceWithFileBackedOutputStream starting");
+
+        final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+        FileBackedOutputStream fileBackedStream = FILE_BACKED_STREAM_FACTORY.newInstance();
+        try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
+            out.writeObject(message);
+        }
+
+        try (MessageSlicer slicer = newMessageSlicer("testSliceWithFileBackedOutputStream",
+                SerializationUtils.serialize(message).length)) {
+            slicer.slice(SliceOptions.builder().identifier(IDENTIFIER).fileBackedOutputStream(fileBackedStream)
+                    .sendTo(ACTOR_SYSTEM.actorSelection(sendToProbe.ref().path())).replyTo(replyToProbe.ref())
+                    .onFailureCallback(mockOnFailureCallback).build());
+
+            final MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
+            assembler.handleMessage(sliceMessage, sendToProbe.ref());
+            assertAssembledMessage(message, replyToProbe.ref());
+        }
+
+        LOG.info("testSliceWithFileBackedOutputStream ending");
+    }
+
+    @SuppressWarnings("unchecked")
+    private void testSlicing(String logContext, int messageSliceSize, int expTotalSlices, byte[] messageData) {
+        reset(mockAssembledMessageCallback);
+
+        final BytesMessage message = new BytesMessage(messageData);
+
+        try (MessageSlicer slicer = newMessageSlicer(logContext, messageSliceSize)) {
+            slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
+
+            Identifier slicingId = null;
+            int expLastSliceHashCode = SlicedMessageState.INITIAL_SLICE_HASH_CODE;
+            for (int sliceIndex = 1; sliceIndex <= expTotalSlices; sliceIndex++) {
+                final MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
+                slicingId = sliceMessage.getIdentifier();
+                assertMessageSlice(sliceMessage, IDENTIFIER, sliceIndex, expTotalSlices, expLastSliceHashCode,
+                        replyToProbe.ref());
+
+                assembler.handleMessage(sliceMessage, sendToProbe.ref());
+
+                final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
+                assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceIndex);
+
+                expLastSliceHashCode = Arrays.hashCode(sliceMessage.getData());
+
+                slicer.handleMessage(reply);
+            }
+
+            assertAssembledMessage(message, replyToProbe.ref());
+
+            assertFalse("MessageSlicer did not remove state for " + slicingId, slicer.hasState(slicingId));
+            assertFalse("MessageAssembler did not remove state for " + slicingId, assembler.hasState(slicingId));
+        }
+    }
+
+    private void assertFailureCallback(final Class<?> exceptionType) {
+        ArgumentCaptor<Throwable> exceptionCaptor = ArgumentCaptor.forClass(Throwable.class);
+        verify(mockOnFailureCallback).accept(exceptionCaptor.capture());
+        assertEquals("Exception type", exceptionType, exceptionCaptor.getValue().getClass());
+    }
+
+    private void assertAssembledMessage(final BytesMessage message, final ActorRef sender) {
+        assertAssembledMessage(mockAssembledMessageCallback, message, sender);
+    }
+
+    static void assertAssembledMessage(final BiConsumer<Object, ActorRef> mockAssembledMessageCallback,
+            final BytesMessage message, final ActorRef sender) {
+        ArgumentCaptor<Object> assembledMessageCaptor = ArgumentCaptor.forClass(Object.class);
+        ArgumentCaptor<ActorRef> senderActorRefCaptor = ArgumentCaptor.forClass(ActorRef.class);
+        verify(mockAssembledMessageCallback).accept(assembledMessageCaptor.capture(), senderActorRefCaptor.capture());
+        assertEquals("Assembled message", message, assembledMessageCaptor.getValue());
+        assertEquals("Sender ActorRef", sender, senderActorRefCaptor.getValue());
+    }
+
+    static void assertSuccessfulMessageSliceReply(MessageSliceReply reply, Identifier identifier, int sliceIndex) {
+        assertEquals("Identifier", identifier, ((MessageSliceIdentifier)reply.getIdentifier())
+                .getClientIdentifier());
+        assertEquals("SliceIndex", sliceIndex, reply.getSliceIndex());
+    }
+
+    static void assertFailedMessageSliceReply(MessageSliceReply reply, Identifier identifier, boolean isRetriable) {
+        assertEquals("Identifier", identifier, ((MessageSliceIdentifier)reply.getIdentifier())
+                .getClientIdentifier());
+        assertEquals("Failure present", Boolean.TRUE, reply.getFailure().isPresent());
+        assertEquals("isRetriable", isRetriable, reply.getFailure().get().isRetriable());
+    }
+
+    static void assertMessageSlice(MessageSlice sliceMessage, Identifier identifier, int sliceIndex, int totalSlices,
+            int lastSliceHashCode, ActorRef replyTo) {
+        assertEquals("Identifier", identifier, ((MessageSliceIdentifier)sliceMessage.getIdentifier())
+                .getClientIdentifier());
+        assertEquals("SliceIndex", sliceIndex, sliceMessage.getSliceIndex());
+        assertEquals("LastSliceHashCode", lastSliceHashCode, sliceMessage.getLastSliceHashCode());
+        assertEquals("ReplyTo", replyTo, sliceMessage.getReplyTo());
+
+        if (totalSlices != DONT_CARE) {
+            assertEquals("TotalSlices", totalSlices, sliceMessage.getTotalSlices());
+        }
+    }
+
+    private MessageSlicer newMessageSlicer(String logContext, final int messageSliceSize) {
+        return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext)
+                .filedBackedStreamFactory(FILE_BACKED_STREAM_FACTORY).build();
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/StringIdentifier.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/StringIdentifier.java
new file mode 100644 (file)
index 0000000..6686e01
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import org.opendaylight.yangtools.util.AbstractStringIdentifier;
+
+/**
+ * Identifier that stores a string.
+ *
+ * @author Thomas Pantelis
+ */
+public class StringIdentifier extends AbstractStringIdentifier<StringIdentifier> {
+    private static final long serialVersionUID = 1L;
+
+    public StringIdentifier(String string) {
+        super(string);
+    }
+}