Bug 7449: Add message slicing/re-assembly classes
authorTom Pantelis <tompantelis@gmail.com>
Thu, 20 Apr 2017 13:10:05 +0000 (09:10 -0400)
committerRobert Varga <nite@hq.sk>
Tue, 20 Jun 2017 09:27:22 +0000 (09:27 +0000)
Added re-usable classes for slicing messages into smaller chunks and
re-assembling them. The process is similar to the current raft
install snapshot chunking.

The 2 main classes are MessageSlicer and MessageAssembler. The basic workflow
is:

 - MessageSlicer#slice method is called which serializes the message,
   creates and caches a SlicedMessageState instance and determines the number
   of slices based on the serialized size and the maximum message slice size,
   then sends the first MessageSlice message.
 - The MessageAssembler on the other end receives the MessageSlice, creates an
   AssembledMessageState instance if necessary and appends the byte[] chunk to
   the assembled stream. A MessageSliceReply is returned.
 - The MessageSlicer receives the MessageSliceReply and sends the next
   MessageSlice chunk, if necessary.
 - Once the last MessageSlice chunk is received by the MessageAssembler, it
   re-assembles the original message by de-serializing the assembled stream
   and notifies the user-supplied callback (of type Consumer<Object>) to handle
   the message.

Both MessageSlicer and MessageAssembler use a guava Cache and can be configured
to evict state that has been inactive for a period of time, ie if a message hasn't
been received by the other end.

The MessageSliceReply can propagate a MessageSliceException. If the
MessageSliceException indicates it's re-triable, the MessageSlicer will restart
slicing from the beginning. Otherwise slicing is aborted and the user-supplied
failure callback is notified.

Change-Id: Iceea212b12f49c3944bade50afded92244e4b31a
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
13 files changed:
java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamFactory.java [new file with mode: 0644]
java/org/opendaylight/controller/cluster/messaging/AbortSlicing.java [new file with mode: 0644]
java/org/opendaylight/controller/cluster/messaging/AssembledMessageState.java [new file with mode: 0644]
java/org/opendaylight/controller/cluster/messaging/AssemblerClosedException.java [new file with mode: 0644]
java/org/opendaylight/controller/cluster/messaging/AssemblerSealedException.java [new file with mode: 0644]
java/org/opendaylight/controller/cluster/messaging/MessageAssembler.java [new file with mode: 0644]
java/org/opendaylight/controller/cluster/messaging/MessageSlice.java [new file with mode: 0644]
java/org/opendaylight/controller/cluster/messaging/MessageSliceException.java [new file with mode: 0644]
java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifier.java [new file with mode: 0644]
java/org/opendaylight/controller/cluster/messaging/MessageSliceReply.java [new file with mode: 0644]
java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java [new file with mode: 0644]
java/org/opendaylight/controller/cluster/messaging/SliceOptions.java [new file with mode: 0644]
java/org/opendaylight/controller/cluster/messaging/SlicedMessageState.java [new file with mode: 0644]

diff --git a/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamFactory.java b/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamFactory.java
new file mode 100644 (file)
index 0000000..0cd4be6
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import javax.annotation.Nullable;
+
+/**
+ * A factory for creating {@link FileBackedOutputStream} instances.
+ *
+ * @author Thomas Pantelis
+ * @see FileBackedOutputStream
+ */
+public class FileBackedOutputStreamFactory {
+    private final int fileThreshold;
+    private final String fileDirectory;
+
+    /**
+     * Constructor.
+     *
+     * @param fileThreshold the number of bytes before streams should switch to buffering to a file
+     * @param fileDirectory the directory in which to create files if needed. If null, the default temp file
+     *                      location is used.
+     */
+    public FileBackedOutputStreamFactory(final int fileThreshold, final @Nullable String fileDirectory) {
+        this.fileThreshold = fileThreshold;
+        this.fileDirectory = fileDirectory;
+    }
+
+    /**
+     * Creates a new {@link FileBackedOutputStream} with the settings configured for this factory.
+     *
+     * @return a {@link FileBackedOutputStream} instance
+     */
+    public FileBackedOutputStream newInstance() {
+        return new FileBackedOutputStream(fileThreshold, fileDirectory);
+    }
+}
diff --git a/java/org/opendaylight/controller/cluster/messaging/AbortSlicing.java b/java/org/opendaylight/controller/cluster/messaging/AbortSlicing.java
new file mode 100644 (file)
index 0000000..b136ed7
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * Message sent to abort slicing.
+ *
+ * @author Thomas Pantelis
+ */
+class AbortSlicing implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final Identifier identifier;
+
+    AbortSlicing(final Identifier identifier) {
+        this.identifier = Preconditions.checkNotNull(identifier);
+    }
+
+    Identifier getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public String toString() {
+        return "AbortSlicing [identifier=" + identifier + "]";
+    }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private AbortSlicing abortSlicing;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+        }
+
+        Proxy(AbortSlicing abortSlicing) {
+            this.abortSlicing = abortSlicing;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(abortSlicing.identifier);
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            abortSlicing = new AbortSlicing((Identifier) in.readObject());
+        }
+
+        private Object readResolve() {
+            return abortSlicing;
+        }
+    }
+}
diff --git a/java/org/opendaylight/controller/cluster/messaging/AssembledMessageState.java b/java/org/opendaylight/controller/cluster/messaging/AssembledMessageState.java
new file mode 100644 (file)
index 0000000..16c73c7
--- /dev/null
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains the state of an assembled message.
+ *
+ * @author Thomas Pantelis
+ */
+@NotThreadSafe
+public class AssembledMessageState implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(AssembledMessageState.class);
+
+    private final int totalSlices;
+    private final BufferedOutputStream bufferedStream;
+    private final FileBackedOutputStream fileBackedStream;
+    private final Identifier identifier;
+    private final String logContext;
+
+    private int lastSliceIndexReceived = SlicedMessageState.FIRST_SLICE_INDEX - 1;
+    private int lastSliceHashCodeReceived = SlicedMessageState.INITIAL_SLICE_HASH_CODE;
+    private boolean sealed = false;
+    private boolean closed = false;
+    private long assembledSize;
+
+    /**
+     * Constructor.
+     *
+     * @param identifier the identifier for this instance
+     * @param totalSlices the total number of slices to expect
+     * @param fileBackedStreamFactory factory for creating the FileBackedOutputStream instance used for streaming
+     * @param logContext the context for log messages
+     */
+    public AssembledMessageState(final Identifier identifier, final int totalSlices,
+            final FileBackedOutputStreamFactory fileBackedStreamFactory, final String logContext) {
+        this.identifier = identifier;
+        this.totalSlices = totalSlices;
+        this.logContext = logContext;
+
+        fileBackedStream = fileBackedStreamFactory.newInstance();
+        bufferedStream = new BufferedOutputStream(fileBackedStream);
+    }
+
+    /**
+     * Returns the identifier of this instance.
+     *
+     * @return the identifier
+     */
+    public Identifier getIdentifier() {
+        return identifier;
+    }
+
+    /**
+     * Adds a slice to the assembled stream.
+     *
+     * @param sliceIndex the index of the slice
+     * @param data the sliced data
+     * @param lastSliceHashCode the hash code of the last slice sent
+     * @return true if this is the last slice received, false otherwise
+     * @throws MessageSliceException
+     *         <ul>
+     *         <li>if the slice index is invalid</li>
+     *         <li>if the last slice hash code is invalid</li>
+     *         <li>if an error occurs writing the data to the stream</li>
+     *         </ul>
+     *         In addition, this instance is automatically closed and can no longer be used.
+     * @throws AssemblerSealedException if this instance is already sealed (ie has received all the slices)
+     * @throws AssemblerClosedException if this instance is already closed
+     */
+    public boolean addSlice(final int sliceIndex, final byte[] data, final int lastSliceHashCode)
+            throws MessageSliceException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{}: addSlice: identifier: {}, sliceIndex: {}, lastSliceIndex: {}, assembledSize: {}, "
+                    + "sliceHashCode: {}, lastSliceHashCode: {}", logContext, identifier, sliceIndex,
+                    lastSliceIndexReceived, assembledSize, lastSliceHashCode, lastSliceHashCodeReceived);
+        }
+
+        try {
+            validateSlice(sliceIndex, lastSliceHashCode);
+
+            assembledSize += data.length;
+            lastSliceIndexReceived = sliceIndex;
+            lastSliceHashCodeReceived = Arrays.hashCode(data);
+
+            bufferedStream.write(data);
+
+            sealed = sliceIndex == totalSlices;
+            if (sealed) {
+                bufferedStream.close();
+            }
+        } catch (IOException e) {
+            close();
+            throw new MessageSliceException(String.format("Error writing data for slice %d of message %s",
+                    sliceIndex, identifier), e);
+        }
+
+        return sealed;
+    }
+
+    /**
+     * Returns the assembled bytes as a ByteSource. This method must only be called after this instance is sealed.
+     *
+     * @return a ByteSource containing the assembled bytes
+     * @throws IOException if an error occurs obtaining the assembled bytes
+     * @throws IllegalStateException is this instance is not sealed
+     */
+    public ByteSource getAssembledBytes() throws IOException {
+        Preconditions.checkState(sealed, "Last slice not received yet");
+        return fileBackedStream.asByteSource();
+    }
+
+    private void validateSlice(final int sliceIndex, final int lastSliceHashCode) throws MessageSliceException {
+        if (closed) {
+            throw new AssemblerClosedException(identifier);
+        }
+
+        if (sealed) {
+            throw new AssemblerSealedException(String.format(
+                    "Received slice index for message %s but all %d expected slices have already already received.",
+                    identifier, totalSlices));
+        }
+
+        if (lastSliceIndexReceived + 1 != sliceIndex) {
+            close();
+            throw new MessageSliceException(String.format("Expected sliceIndex %d but got %d for message %s",
+                    lastSliceIndexReceived + 1, sliceIndex, identifier), true);
+        }
+
+        if (lastSliceHashCode != lastSliceHashCodeReceived) {
+            close();
+            throw new MessageSliceException(String.format("The hash code of the recorded last slice (%d) does not "
+                    + "match the senders last hash code (%d) for message %s", lastSliceHashCodeReceived,
+                    lastSliceHashCode, identifier), true);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (closed) {
+            return;
+        }
+
+        closed = true;
+        if (!sealed) {
+            try {
+                bufferedStream.close();
+            } catch (IOException e) {
+                LOG.debug("{}: Error closing output stream", logContext, e);
+            }
+        }
+
+        fileBackedStream.cleanup();
+    }
+}
diff --git a/java/org/opendaylight/controller/cluster/messaging/AssemblerClosedException.java b/java/org/opendaylight/controller/cluster/messaging/AssemblerClosedException.java
new file mode 100644 (file)
index 0000000..83c8dcb
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * A MessageSliceException indicating the message assembler has already been closed.
+ *
+ * @author Thomas Pantelis
+ */
+public class AssemblerClosedException extends MessageSliceException {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Constructs an instance.
+     *
+     * @param identifier the identifier whose state was closed
+     */
+    public AssemblerClosedException(final Identifier identifier) {
+        super(String.format("Message assembler for %s has already been closed", identifier), false);
+    }
+}
diff --git a/java/org/opendaylight/controller/cluster/messaging/AssemblerSealedException.java b/java/org/opendaylight/controller/cluster/messaging/AssemblerSealedException.java
new file mode 100644 (file)
index 0000000..df9ac63
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+/**
+ * A MessageSliceException indicating the message assembler has already been sealed.
+ *
+ * @author Thomas Pantelis
+ */
+public class AssemblerSealedException extends MessageSliceException {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Constructs an instance.
+     *
+     * @param message he detail message
+     */
+    public AssemblerSealedException(String message) {
+        super(message, false);
+    }
+}
diff --git a/java/org/opendaylight/controller/cluster/messaging/MessageAssembler.java b/java/org/opendaylight/controller/cluster/messaging/MessageAssembler.java
new file mode 100644 (file)
index 0000000..71b07f4
--- /dev/null
@@ -0,0 +1,286 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class re-assembles messages sliced into smaller chunks by {@link MessageSlicer}.
+ *
+ * @author Thomas Pantelis
+ * @see MessageSlicer
+ */
+public class MessageAssembler implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(MessageAssembler.class);
+
+    private final Cache<Identifier, AssembledMessageState> stateCache;
+    private final FileBackedOutputStreamFactory filedBackedStreamFactory;
+    private final BiConsumer<Object, ActorRef> assembledMessageCallback;
+    private final String logContext;
+
+    private MessageAssembler(Builder builder) {
+        this.filedBackedStreamFactory = Preconditions.checkNotNull(builder.filedBackedStreamFactory,
+                "FiledBackedStreamFactory cannot be null");
+        this.assembledMessageCallback = Preconditions.checkNotNull(builder.assembledMessageCallback,
+                "assembledMessageCallback cannot be null");
+        this.logContext = builder.logContext;
+
+        stateCache = CacheBuilder.newBuilder()
+                .expireAfterAccess(builder.expireStateAfterInactivityDuration, builder.expireStateAfterInactivityUnit)
+                .removalListener((RemovalListener<Identifier, AssembledMessageState>) notification ->
+                    stateRemoved(notification)).build();
+    }
+
+    /**
+     * Returns a new Builder for creating MessageAssembler instances.
+     *
+     * @return a Builder instance
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Checks if the given message is handled by this class. If so, it should be forwarded to the
+     * {@link #handleMessage(Object, ActorRef)} method
+     *
+     * @param message the message to check
+     * @return true if handled, false otherwise
+     */
+    public static boolean isHandledMessage(Object message) {
+        return message instanceof MessageSlice || message instanceof AbortSlicing;
+    }
+
+    @Override
+    public void close() {
+        LOG.debug("{}: Closing", logContext);
+        stateCache.invalidateAll();
+    }
+
+    /**
+     * Checks for and removes assembled message state that has expired due to inactivity from the slicing component
+     * on the other end.
+     */
+    public void checkExpiredAssembledMessageState() {
+        if (stateCache.size() > 0) {
+            stateCache.cleanUp();
+        }
+    }
+
+    /**
+     * Invoked to handle message slices and other messages pertaining to this class.
+     *
+     * @param message the message
+     * @param sendTo the reference of the actor to which subsequent message slices should be sent
+     * @return true if the message was handled, false otherwise
+     */
+    public boolean handleMessage(final Object message, final @Nonnull ActorRef sendTo) {
+        if (message instanceof MessageSlice) {
+            LOG.debug("{}: handleMessage: {}", logContext, message);
+            onMessageSlice((MessageSlice) message, sendTo);
+            return true;
+        } else if (message instanceof AbortSlicing) {
+            LOG.debug("{}: handleMessage: {}", logContext, message);
+            onAbortSlicing((AbortSlicing) message);
+            return true;
+        }
+
+        return false;
+    }
+
+    private void onMessageSlice(final MessageSlice messageSlice, final ActorRef sendTo) {
+        final Identifier identifier = messageSlice.getIdentifier();
+        try {
+            final AssembledMessageState state = stateCache.get(identifier, () -> createState(messageSlice));
+            processMessageSliceForState(messageSlice, state, sendTo);
+        } catch (ExecutionException e) {
+            final MessageSliceException messageSliceEx;
+            final Throwable cause = e.getCause();
+            if (cause instanceof MessageSliceException) {
+                messageSliceEx = (MessageSliceException) cause;
+            } else {
+                messageSliceEx = new MessageSliceException(String.format(
+                        "Error creating state for identifier %s", identifier), cause);
+            }
+
+            messageSlice.getReplyTo().tell(MessageSliceReply.failed(identifier, messageSliceEx, sendTo),
+                    ActorRef.noSender());
+        }
+    }
+
+    private AssembledMessageState createState(final MessageSlice messageSlice) throws MessageSliceException {
+        final Identifier identifier = messageSlice.getIdentifier();
+        if (messageSlice.getSliceIndex() == SlicedMessageState.FIRST_SLICE_INDEX) {
+            LOG.debug("{}: Received first slice for {} - creating AssembledMessageState", logContext, identifier);
+            return new AssembledMessageState(identifier, messageSlice.getTotalSlices(),
+                    filedBackedStreamFactory, logContext);
+        }
+
+        LOG.debug("{}: AssembledMessageState not found for {} - returning failed reply", logContext, identifier);
+        throw new MessageSliceException(String.format(
+                "No assembled state found for identifier %s and slice index %s", identifier,
+                messageSlice.getSliceIndex()), true);
+    }
+
+    private void processMessageSliceForState(final MessageSlice messageSlice, AssembledMessageState state,
+            final ActorRef sendTo) {
+        final Identifier identifier = messageSlice.getIdentifier();
+        final ActorRef replyTo = messageSlice.getReplyTo();
+        Object reAssembledMessage = null;
+        synchronized (state) {
+            final int sliceIndex = messageSlice.getSliceIndex();
+            try {
+                final MessageSliceReply successReply = MessageSliceReply.success(identifier, sliceIndex, sendTo);
+                if (state.addSlice(sliceIndex, messageSlice.getData(), messageSlice.getLastSliceHashCode())) {
+                    LOG.debug("{}: Received last slice for {}", logContext, identifier);
+
+                    reAssembledMessage = reAssembleMessage(state);
+
+                    replyTo.tell(successReply, ActorRef.noSender());
+                    removeState(identifier);
+                } else {
+                    LOG.debug("{}: Added slice for {} - expecting more", logContext, identifier);
+                    replyTo.tell(successReply, ActorRef.noSender());
+                }
+            } catch (MessageSliceException e) {
+                LOG.warn("{}: Error processing {}", logContext, messageSlice, e);
+                replyTo.tell(MessageSliceReply.failed(identifier, e, sendTo), ActorRef.noSender());
+                removeState(identifier);
+            }
+        }
+
+        if (reAssembledMessage != null) {
+            LOG.debug("{}: Notifying callback of re-assembled message {}", logContext, reAssembledMessage);
+            assembledMessageCallback.accept(reAssembledMessage, replyTo);
+        }
+    }
+
+    private Object reAssembleMessage(final AssembledMessageState state) throws MessageSliceException {
+        try {
+            final ByteSource assembledBytes = state.getAssembledBytes();
+            try (ObjectInputStream in = new ObjectInputStream(assembledBytes.openStream())) {
+                return in.readObject();
+            }
+
+        } catch (IOException | ClassNotFoundException  e) {
+            throw new MessageSliceException(String.format("Error re-assembling bytes for identifier %s",
+                    state.getIdentifier()), e);
+        }
+    }
+
+    private void onAbortSlicing(AbortSlicing message) {
+        removeState(message.getIdentifier());
+    }
+
+    private void removeState(final Identifier identifier) {
+        LOG.debug("{}: Removing state for {}", logContext, identifier);
+        stateCache.invalidate(identifier);
+    }
+
+    private void stateRemoved(RemovalNotification<Identifier, AssembledMessageState> notification) {
+        if (notification.wasEvicted()) {
+            LOG.warn("{}: AssembledMessageState for {} was expired from the cache", logContext, notification.getKey());
+        } else {
+            LOG.debug("{}: AssembledMessageState for {} was removed from the cache due to {}", logContext,
+                    notification.getKey(), notification.getCause());
+        }
+
+        notification.getValue().close();
+    }
+
+    @VisibleForTesting
+    boolean hasState(Identifier forIdentifier) {
+        boolean exists = stateCache.getIfPresent(forIdentifier) != null;
+        stateCache.cleanUp();
+        return exists;
+    }
+
+    public static class Builder {
+        private FileBackedOutputStreamFactory filedBackedStreamFactory;
+        private BiConsumer<Object, ActorRef> assembledMessageCallback;
+        private long expireStateAfterInactivityDuration = 1;
+        private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
+        private String logContext = "<no-context>";
+
+        /**
+         * Sets the factory for creating FileBackedOutputStream instances used for streaming messages.
+         *
+         * @param newFiledBackedStreamFactory the factory for creating FileBackedOutputStream instances
+         * @return this Builder
+         */
+        public Builder filedBackedStreamFactory(final FileBackedOutputStreamFactory newFiledBackedStreamFactory) {
+            this.filedBackedStreamFactory = Preconditions.checkNotNull(newFiledBackedStreamFactory);
+            return this;
+        }
+
+        /**
+         * Sets the Consumer callback for assembled messages. The callback takes the assembled message and the
+         * original sender ActorRef as arguments.
+         *
+         * @param newAssembledMessageCallback the Consumer callback
+         * @return this Builder
+         */
+        public Builder assembledMessageCallback(final BiConsumer<Object, ActorRef> newAssembledMessageCallback) {
+            this.assembledMessageCallback = newAssembledMessageCallback;
+            return this;
+        }
+
+        /**
+         * Sets the duration and time unit whereby assembled message state is purged from the cache due to
+         * inactivity from the slicing component on the other end. By default, state is purged after 1 minute of
+         * inactivity.
+         *
+         * @param duration the length of time after which a state entry is purged
+         * @param unit the unit the duration is expressed in
+         * @return this Builder
+         */
+        public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
+            Preconditions.checkArgument(duration > 0, "duration must be > 0");
+            this.expireStateAfterInactivityDuration = duration;
+            this.expireStateAfterInactivityUnit = unit;
+            return this;
+        }
+
+        /**
+         * Sets the context for log messages.
+         *
+         * @param newLogContext the log context
+         * @return this Builder
+         */
+        public Builder logContext(final String newLogContext) {
+            this.logContext = newLogContext;
+            return this;
+        }
+
+        /**
+         * Builds a new MessageAssembler instance.
+         *
+         * @return a new MessageAssembler
+         */
+        public MessageAssembler build() {
+            return new MessageAssembler(this);
+        }
+    }
+}
diff --git a/java/org/opendaylight/controller/cluster/messaging/MessageSlice.java b/java/org/opendaylight/controller/cluster/messaging/MessageSlice.java
new file mode 100644 (file)
index 0000000..00ab31d
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import akka.actor.ActorRef;
+import akka.serialization.JavaSerializer;
+import akka.serialization.Serialization;
+import com.google.common.base.Preconditions;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * Represents a sliced message chunk.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSlice implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final Identifier identifier;
+    private final byte[] data;
+    private final int sliceIndex;
+    private final int totalSlices;
+    private final int lastSliceHashCode;
+    private final ActorRef replyTo;
+
+    public MessageSlice(Identifier identifier, byte[] data, int sliceIndex, int totalSlices, int lastSliceHashCode,
+            final ActorRef replyTo) {
+        this.identifier = Preconditions.checkNotNull(identifier);
+        this.data = Preconditions.checkNotNull(data);
+        this.sliceIndex = sliceIndex;
+        this.totalSlices = totalSlices;
+        this.lastSliceHashCode = lastSliceHashCode;
+        this.replyTo = Preconditions.checkNotNull(replyTo);
+    }
+
+    public Identifier getIdentifier() {
+        return identifier;
+    }
+
+    @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "Exposes a mutable object stored in a field but "
+            + "this is OK since this class is merely a DTO and does not process the byte[] internally."
+            + "Also it would be inefficient to create a return copy as the byte[] could be large.")
+    public byte[] getData() {
+        return data;
+    }
+
+    public int getSliceIndex() {
+        return sliceIndex;
+    }
+
+    public int getTotalSlices() {
+        return totalSlices;
+    }
+
+    public int getLastSliceHashCode() {
+        return lastSliceHashCode;
+    }
+
+    public ActorRef getReplyTo() {
+        return replyTo;
+    }
+
+    @Override
+    public String toString() {
+        return "MessageSlice [identifier=" + identifier + ", data.length=" + data.length + ", sliceIndex="
+                + sliceIndex + ", totalSlices=" + totalSlices + ", lastSliceHashCode=" + lastSliceHashCode
+                + ", replyTo=" + replyTo + "]";
+    }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private MessageSlice messageSlice;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+        }
+
+        Proxy(MessageSlice messageSlice) {
+            this.messageSlice = messageSlice;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(messageSlice.identifier);
+            out.writeInt(messageSlice.sliceIndex);
+            out.writeInt(messageSlice.totalSlices);
+            out.writeInt(messageSlice.lastSliceHashCode);
+            out.writeObject(messageSlice.data);
+            out.writeObject(Serialization.serializedActorPath(messageSlice.replyTo));
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            Identifier identifier = (Identifier) in.readObject();
+            int sliceIndex = in.readInt();
+            int totalSlices = in.readInt();
+            int lastSliceHashCode = in.readInt();
+            byte[] data = (byte[])in.readObject();
+            ActorRef replyTo = JavaSerializer.currentSystem().value().provider()
+                    .resolveActorRef((String) in.readObject());
+
+            messageSlice = new MessageSlice(identifier, data, sliceIndex, totalSlices, lastSliceHashCode, replyTo);
+        }
+
+        private Object readResolve() {
+            return messageSlice;
+        }
+    }
+}
diff --git a/java/org/opendaylight/controller/cluster/messaging/MessageSliceException.java b/java/org/opendaylight/controller/cluster/messaging/MessageSliceException.java
new file mode 100644 (file)
index 0000000..09ee723
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+/**
+ * An exception indicating a message slice failure.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSliceException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    private final boolean isRetriable;
+
+    /**
+     * Constructs an instance.
+     *
+     * @param message the detail message
+     * @param cause the cause
+     */
+    public MessageSliceException(final String message, final Throwable cause) {
+        super(message, cause);
+        isRetriable = false;
+    }
+
+    /**
+     * Constructs an instance.
+     *
+     * @param message the detail message
+     * @param isRetriable if true, indicates the original operation can be retried
+     */
+    public MessageSliceException(final String message, final boolean isRetriable) {
+        super(message);
+        this.isRetriable = isRetriable;
+    }
+
+    /**
+     * Returns whether or not the original operation can be retried.
+     *
+     * @return true if it can be retried, false otherwise
+     */
+    public boolean isRetriable() {
+        return isRetriable;
+    }
+}
diff --git a/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifier.java b/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifier.java
new file mode 100644 (file)
index 0000000..2d1147b
--- /dev/null
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * Identifier for a message slice that is composed of a client-supplied Identifier and an internal counter value.
+ *
+ * @author Thomas Pantelis
+ */
+final class MessageSliceIdentifier implements Identifier {
+    private static final long serialVersionUID = 1L;
+    private static final AtomicLong ID_COUNTER = new AtomicLong(1);
+
+    private final Identifier clientIdentifier;
+    private final long messageId;
+
+    MessageSliceIdentifier(final Identifier clientIdentifier) {
+        this(clientIdentifier, ID_COUNTER.getAndIncrement());
+    }
+
+    private MessageSliceIdentifier(final Identifier clientIdentifier, final long messageId) {
+        this.clientIdentifier = Preconditions.checkNotNull(clientIdentifier);
+        this.messageId = messageId;
+    }
+
+    Identifier getClientIdentifier() {
+        return clientIdentifier;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + clientIdentifier.hashCode();
+        result = prime * result + (int) (messageId ^ messageId >>> 32);
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (!(obj instanceof MessageSliceIdentifier)) {
+            return false;
+        }
+
+        MessageSliceIdentifier other = (MessageSliceIdentifier) obj;
+        return other.clientIdentifier.equals(clientIdentifier) && other.messageId == messageId;
+    }
+
+    @Override
+    public String toString() {
+        return "MessageSliceIdentifier [clientIdentifier=" + clientIdentifier + ", messageId=" + messageId + "]";
+    }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private MessageSliceIdentifier messageSliceId;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+        }
+
+        Proxy(MessageSliceIdentifier messageSliceId) {
+            this.messageSliceId = messageSliceId;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(messageSliceId.clientIdentifier);
+            out.writeLong(messageSliceId.messageId);
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            messageSliceId = new MessageSliceIdentifier((Identifier) in.readObject(), in.readLong());
+        }
+
+        private Object readResolve() {
+            return messageSliceId;
+        }
+    }
+}
diff --git a/java/org/opendaylight/controller/cluster/messaging/MessageSliceReply.java b/java/org/opendaylight/controller/cluster/messaging/MessageSliceReply.java
new file mode 100644 (file)
index 0000000..dc80c33
--- /dev/null
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import akka.actor.ActorRef;
+import akka.serialization.JavaSerializer;
+import akka.serialization.Serialization;
+import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.util.Optional;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * The reply message for {@link MessageSlice}.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSliceReply implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final Identifier identifier;
+    private final int sliceIndex;
+    private final MessageSliceException failure;
+    private final ActorRef sendTo;
+
+    private MessageSliceReply(final Identifier identifier, final int sliceIndex, final MessageSliceException failure,
+            final ActorRef sendTo) {
+        this.identifier = Preconditions.checkNotNull(identifier);
+        this.sliceIndex = sliceIndex;
+        this.sendTo = Preconditions.checkNotNull(sendTo);
+        this.failure = failure;
+    }
+
+    public static MessageSliceReply success(final Identifier identifier, final int sliceIndex, final ActorRef sendTo) {
+        return new MessageSliceReply(identifier, sliceIndex, null, sendTo);
+    }
+
+    public static MessageSliceReply failed(final Identifier identifier, final MessageSliceException failure,
+            final ActorRef sendTo) {
+        return new MessageSliceReply(identifier, -1, failure, sendTo);
+    }
+
+    public Identifier getIdentifier() {
+        return identifier;
+    }
+
+    public int getSliceIndex() {
+        return sliceIndex;
+    }
+
+    public ActorRef getSendTo() {
+        return sendTo;
+    }
+
+    public Optional<MessageSliceException> getFailure() {
+        return Optional.ofNullable(failure);
+    }
+
+    @Override
+    public String toString() {
+        return "MessageSliceReply [identifier=" + identifier + ", sliceIndex=" + sliceIndex + ", failure=" + failure
+                + ", sendTo=" + sendTo + "]";
+    }
+
+    private Object writeReplace() {
+        return new Proxy(this);
+    }
+
+    private static class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private MessageSliceReply messageSliceReply;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+        }
+
+        Proxy(MessageSliceReply messageSliceReply) {
+            this.messageSliceReply = messageSliceReply;
+        }
+
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(messageSliceReply.identifier);
+            out.writeInt(messageSliceReply.sliceIndex);
+            out.writeObject(messageSliceReply.failure);
+            out.writeObject(Serialization.serializedActorPath(messageSliceReply.sendTo));
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            final Identifier identifier = (Identifier) in.readObject();
+            final int sliceIndex = in.readInt();
+            final MessageSliceException failure = (MessageSliceException) in.readObject();
+            ActorRef sendTo = JavaSerializer.currentSystem().value().provider()
+                    .resolveActorRef((String) in.readObject());
+
+            messageSliceReply = new MessageSliceReply(identifier, sliceIndex, failure, sendTo);
+        }
+
+        private Object readResolve() {
+            return messageSliceReply;
+        }
+    }
+}
diff --git a/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java b/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java
new file mode 100644 (file)
index 0000000..484a5c2
--- /dev/null
@@ -0,0 +1,374 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class slices messages into smaller chunks. {@link MessageAssembler} is used to re-assemble the messages.
+ *
+ * @author Thomas Pantelis
+ * @see MessageAssembler
+ */
+public class MessageSlicer implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
+    public static final int DEFAULT_MAX_SLICING_TRIES = 3;
+
+    private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
+    private final FileBackedOutputStreamFactory filedBackedStreamFactory;
+    private final int messageSliceSize;
+    private final int maxSlicingTries;
+    private final String logContext;
+
+    private MessageSlicer(Builder builder) {
+        this.filedBackedStreamFactory = builder.filedBackedStreamFactory;
+        this.messageSliceSize = builder.messageSliceSize;
+        this.maxSlicingTries = builder.maxSlicingTries;
+        this.logContext = builder.logContext;
+
+        CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
+                (RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
+        if (builder.expireStateAfterInactivityDuration > 0) {
+            cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration,
+                    builder.expireStateAfterInactivityUnit);
+        }
+
+        stateCache = cacheBuilder.build();
+    }
+
+    /**
+     * Returns a new Builder for creating MessageSlicer instances.
+     *
+     * @return a Builder instance
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Checks if the given message is handled by this class. If so, it should be forwarded to the
+     * {@link #handleMessage(Object)} method
+     *
+     * @param message the message to check
+     * @return true if handled, false otherwise
+     */
+    public static boolean isHandledMessage(Object message) {
+        return message instanceof MessageSliceReply;
+    }
+
+    /**
+     * Slices a message into chunks based on the serialized size, the maximum message slice size and the given
+     * options.
+     *
+     * @param options the SliceOptions
+     */
+    public void slice(SliceOptions options) {
+        final Identifier identifier = options.getIdentifier();
+        final Serializable message = options.getMessage();
+        final FileBackedOutputStream fileBackedStream;
+        if (message != null) {
+            LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
+
+
+            Preconditions.checkNotNull(filedBackedStreamFactory,
+                    "The FiledBackedStreamFactory must be set in order to call this slice method");
+
+            // Serialize the message to a FileBackedOutputStream.
+            fileBackedStream = filedBackedStreamFactory.newInstance();
+            try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
+                out.writeObject(message);
+            } catch (IOException e) {
+                LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
+                fileBackedStream.cleanup();
+                options.getOnFailureCallback().accept(e);
+                return;
+            }
+        } else {
+            fileBackedStream = options.getFileBackedStream();
+        }
+
+        initializeSlicing(options, fileBackedStream);
+    }
+
+    private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
+        final Identifier identifier = options.getIdentifier();
+        MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier);
+        SlicedMessageState<ActorRef> state = null;
+        try {
+            state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
+                    options.getReplyTo(), options.getOnFailureCallback(), logContext);
+
+            final Serializable message = options.getMessage();
+            if (state.getTotalSlices() == 1 && message != null) {
+                LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
+                state.close();
+                sendTo(options, message, options.getReplyTo());
+                return;
+            }
+
+            final MessageSlice firstSlice = getNextSliceMessage(state);
+
+            LOG.debug("{}: Sending first slice: {}", logContext, firstSlice);
+
+            stateCache.put(messageSliceId, state);
+            sendTo(options, firstSlice, ActorRef.noSender());
+        } catch (IOException e) {
+            LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
+            if (state != null) {
+                state.close();
+            } else {
+                fileBackedStream.cleanup();
+            }
+
+            options.getOnFailureCallback().accept(e);
+        }
+    }
+
+    private void sendTo(SliceOptions options, Object message, ActorRef sender) {
+        if (options.getSendToRef() != null) {
+            options.getSendToRef().tell(message, sender);
+        } else {
+            options.getSendToSelection().tell(message, sender);
+        }
+    }
+
+    /**
+     * Invoked to handle messages pertaining to this class.
+     *
+     * @param message the message
+     * @return true if the message was handled, false otherwise
+     */
+    public boolean handleMessage(final Object message) {
+        if (message instanceof MessageSliceReply) {
+            LOG.debug("{}: handleMessage: {}", logContext, message);
+            onMessageSliceReply((MessageSliceReply) message);
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Checks for and removes sliced message state that has expired due to inactivity from the assembling component
+     * on the other end.
+     */
+    public void checkExpiredSlicedMessageState() {
+        if (stateCache.size() > 0) {
+            stateCache.cleanUp();
+        }
+    }
+
+    /**
+     * Closes and removes all in-progress sliced message state.
+     */
+    @Override
+    public void close() {
+        LOG.debug("{}: Closing", logContext);
+        stateCache.invalidateAll();
+    }
+
+    private MessageSlice getNextSliceMessage(SlicedMessageState<ActorRef> state) throws IOException {
+        final byte[] firstSliceBytes = state.getNextSlice();
+        return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
+                state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
+    }
+
+    private void onMessageSliceReply(final MessageSliceReply reply) {
+        final Identifier identifier = reply.getIdentifier();
+        final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
+        if (state == null) {
+            LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
+            reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
+            return;
+        }
+
+        synchronized (state) {
+            try {
+                final Optional<MessageSliceException> failure = reply.getFailure();
+                if (failure.isPresent()) {
+                    LOG.warn("{}: Received failed {}", logContext, reply);
+                    processMessageSliceException(failure.get(), state, reply.getSendTo());
+                    return;
+                }
+
+                if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
+                    LOG.warn("{}: Slice index {} in {} does not match expected index {}", logContext,
+                            reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
+                    reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
+                    possiblyRetrySlicing(state, reply.getSendTo());
+                    return;
+                }
+
+                if (state.isLastSlice(reply.getSliceIndex())) {
+                    LOG.debug("{}: Received last slice reply for {}", logContext, identifier);
+                    removeState(identifier);
+                } else {
+                    final MessageSlice nextSlice = getNextSliceMessage(state);
+                    LOG.debug("{}: Sending next slice: {}", logContext, nextSlice);
+                    reply.getSendTo().tell(nextSlice, ActorRef.noSender());
+                }
+            } catch (IOException e) {
+                LOG.warn("{}: Error processing {}", logContext, reply, e);
+                fail(state, e);
+            }
+        }
+    }
+
+    private void processMessageSliceException(final MessageSliceException exception,
+            final SlicedMessageState<ActorRef> state, final ActorRef sendTo) throws IOException {
+        if (exception.isRetriable()) {
+            possiblyRetrySlicing(state, sendTo);
+        } else {
+            fail(state, exception.getCause() != null ? exception.getCause() : exception);
+        }
+    }
+
+    private void possiblyRetrySlicing(final SlicedMessageState<ActorRef> state, final ActorRef sendTo)
+            throws IOException {
+        if (state.canRetry()) {
+            LOG.info("{}: Retrying message slicing for {}", logContext, state.getIdentifier());
+            state.reset();
+            sendTo.tell(getNextSliceMessage(state), ActorRef.noSender());
+        } else {
+            String message = String.format("Maximum slicing retries reached for identifier %s - failing the message",
+                    state.getIdentifier());
+            LOG.warn(message);
+            fail(state, new RuntimeException(message));
+        }
+    }
+
+    private void removeState(final Identifier identifier) {
+        LOG.debug("{}: Removing state for {}", logContext, identifier);
+        stateCache.invalidate(identifier);
+    }
+
+    private void stateRemoved(RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
+        final SlicedMessageState<ActorRef> state = notification.getValue();
+        state.close();
+        if (notification.wasEvicted()) {
+            LOG.warn("{}: SlicedMessageState for {} was expired from the cache", logContext, notification.getKey());
+            state.getOnFailureCallback().accept(new RuntimeException(String.format(
+                    "The slicing state for message identifier %s was expired due to inactivity from the assembling "
+                     + "component on the other end", state.getIdentifier())));
+        } else {
+            LOG.debug("{}: SlicedMessageState for {} was removed from the cache due to {}", logContext,
+                    notification.getKey(), notification.getCause());
+        }
+    }
+
+    private void fail(final SlicedMessageState<ActorRef> state, final Throwable failure) {
+        removeState(state.getIdentifier());
+        state.getOnFailureCallback().accept(failure);
+    }
+
+    @VisibleForTesting
+    boolean hasState(Identifier forIdentifier) {
+        boolean exists = stateCache.getIfPresent(forIdentifier) != null;
+        stateCache.cleanUp();
+        return exists;
+    }
+
+    public static class Builder {
+        private FileBackedOutputStreamFactory filedBackedStreamFactory;
+        private int messageSliceSize = -1;
+        private long expireStateAfterInactivityDuration = -1;
+        private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
+        private int maxSlicingTries = DEFAULT_MAX_SLICING_TRIES;
+        private String logContext = "<no-context>";
+
+        /**
+         * Sets the factory for creating FileBackedOutputStream instances used for streaming messages. This factory
+         * is used by the {@link MessageSlicer#slice(SliceOptions)} method if a Serializable message is passed.
+         * If Serializable messages aren't passed then the factory need not be set.
+         *
+         * @param newFiledBackedStreamFactory the factory for creating FileBackedOutputStream instances
+         * @return this Builder
+         */
+        public Builder filedBackedStreamFactory(final FileBackedOutputStreamFactory newFiledBackedStreamFactory) {
+            this.filedBackedStreamFactory = Preconditions.checkNotNull(newFiledBackedStreamFactory);
+            return this;
+        }
+
+        /**
+         * Sets the maximum size (in bytes) for a message slice.
+         *
+         * @param newMessageSliceSize the maximum size (in bytes)
+         * @return this Builder
+         */
+        public Builder messageSliceSize(final int newMessageSliceSize) {
+            Preconditions.checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0");
+            this.messageSliceSize = newMessageSliceSize;
+            return this;
+        }
+
+        /**
+         * Sets the maximum number of tries for slicing a message. If exceeded, slicing fails. The default is
+         * defined by {@link #DEFAULT_MAX_SLICING_TRIES}
+         *
+         * @param newMaxSlicingTries the maximum number of tries
+         * @return this Builder
+         */
+        public Builder maxSlicingTries(final int newMaxSlicingTries) {
+            Preconditions.checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0");
+            this.maxSlicingTries = newMaxSlicingTries;
+            return this;
+        }
+
+        /**
+         * Sets the duration and time unit whereby sliced message state is purged from the cache and the associated
+         * failure callback is notified due to inactivity from the assembling component on the other end. By default,
+         * state is not purged due to inactivity.
+         *
+         * @param duration the length of time after which a state entry is purged
+         * @param unit the unit the duration is expressed in
+         * @return this Builder
+         */
+        public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
+            Preconditions.checkArgument(duration > 0, "duration must be > 0");
+            this.expireStateAfterInactivityDuration = duration;
+            this.expireStateAfterInactivityUnit = unit;
+            return this;
+        }
+
+        /**
+         * Sets the context for log messages.
+         *
+         * @param newLogContext the log context
+         * @return this Builder
+         */
+        public Builder logContext(final String newLogContext) {
+            this.logContext = Preconditions.checkNotNull(newLogContext);
+            return this;
+        }
+
+        /**
+         * Builds a new MessageSlicer instance.
+         *
+         * @return a new MessageSlicer
+         */
+        public MessageSlicer build() {
+            return new MessageSlicer(this);
+        }
+    }
+}
diff --git a/java/org/opendaylight/controller/cluster/messaging/SliceOptions.java b/java/org/opendaylight/controller/cluster/messaging/SliceOptions.java
new file mode 100644 (file)
index 0000000..630a554
--- /dev/null
@@ -0,0 +1,196 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * Options for slicing a message with {@link MessageSlicer#slice(SliceOptions)}.
+ *
+ * @author Thomas Pantelis
+ */
+public class SliceOptions {
+    private final Builder builder;
+
+    private SliceOptions(Builder builder) {
+        this.builder = builder;
+    }
+
+    public Identifier getIdentifier() {
+        return builder.identifier;
+    }
+
+    public FileBackedOutputStream getFileBackedStream() {
+        return builder.fileBackedStream;
+    }
+
+    public Serializable getMessage() {
+        return builder.message;
+    }
+
+    public ActorRef getSendToRef() {
+        return builder.sendToRef;
+    }
+
+    public ActorSelection getSendToSelection() {
+        return builder.sendToSelection;
+    }
+
+    public ActorRef getReplyTo() {
+        return builder.replyTo;
+    }
+
+    public Consumer<Throwable> getOnFailureCallback() {
+        return builder.onFailureCallback;
+    }
+
+    /**
+     * Returns a new Builder for creating MessageSlicer instances.
+     *
+     * @return a Builder instance
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private Identifier identifier;
+        private FileBackedOutputStream fileBackedStream;
+        private Serializable message;
+        private ActorRef sendToRef;
+        private ActorSelection sendToSelection;
+        private ActorRef replyTo;
+        private Consumer<Throwable> onFailureCallback;
+        private boolean sealed;
+
+        /**
+         * Sets the identifier of the component to slice.
+         *
+         * @param newIdentifier the identifier
+         * @return this Builder
+         */
+        public Builder identifier(final Identifier newIdentifier) {
+            checkSealed();
+            identifier = newIdentifier;
+            return this;
+        }
+
+        /**
+         * Sets the {@link FileBackedOutputStream} containing the message data to slice.
+         *
+         * @param newFileBackedStream the {@link FileBackedOutputStream}
+         * @return this Builder
+         */
+        public Builder fileBackedOutputStream(final FileBackedOutputStream newFileBackedStream) {
+            checkSealed();
+            fileBackedStream = newFileBackedStream;
+            return this;
+        }
+
+        /**
+         * Sets the message to slice. The message is first serialized to a {@link FileBackedOutputStream}. If the
+         * message doesn't need to be sliced, ie its serialized size is less than the maximum message slice size, then
+         * the original message is sent. Otherwise the first message slice is sent.
+         *
+         * <p>
+         * <b>Note:</b> a {@link FileBackedOutputStreamFactory} must be set in the {@link MessageSlicer}.
+         *
+         * @param newMessage the message
+         * @param <T> the Serializable message type
+         * @return this Builder
+         */
+        public <T extends Serializable> Builder message(final T newMessage) {
+            checkSealed();
+            message = newMessage;
+            return this;
+        }
+
+        /**
+         * Sets the reference of the actor to which to send the message slices.
+         *
+         * @param sendTo the ActorRef
+         * @return this Builder
+         */
+        public Builder sendTo(final ActorRef sendTo) {
+            checkSealed();
+            sendToRef = sendTo;
+            return this;
+        }
+
+        /**
+         * Sets the ActorSelection to which to send the message slices.
+         *
+         * @param sendTo the ActorSelection
+         * @return this Builder
+         */
+        public Builder sendTo(final ActorSelection sendTo) {
+            checkSealed();
+            sendToSelection = sendTo;
+            return this;
+        }
+
+        /**
+         * Sets the reference of the actor to which message slice replies should be sent. The actor should
+         * forward the replies to the {@link MessageSlicer#handleMessage(Object)} method.
+         *
+         * @param newReplyTo the ActorRef
+         * @return this Builder
+         */
+        public Builder replyTo(final ActorRef newReplyTo) {
+            checkSealed();
+            replyTo = newReplyTo;
+            return this;
+        }
+
+        /**
+         * Sets the callback to be notified of failure.
+         *
+         * @param newOnFailureCallback the callback
+         * @return this Builder
+         */
+        public Builder onFailureCallback(final Consumer<Throwable> newOnFailureCallback) {
+            checkSealed();
+            onFailureCallback = newOnFailureCallback;
+            return this;
+        }
+
+        /**
+         * Builds a new SliceOptions instance.
+         *
+         * @return a new SliceOptions
+         */
+        public SliceOptions build() {
+            sealed = true;
+
+            Preconditions.checkNotNull(identifier, "identifier must be set");
+            Preconditions.checkNotNull(replyTo, "replyTo must be set");
+            Preconditions.checkNotNull(onFailureCallback, "onFailureCallback must be set");
+            Preconditions.checkState(fileBackedStream == null || message == null,
+                    "Only one of message and fileBackedStream can be set");
+            Preconditions.checkState(!(fileBackedStream == null && message == null),
+                    "One of message and fileBackedStream must be set");
+            Preconditions.checkState(sendToRef == null || sendToSelection == null,
+                    "Only one of sendToRef and sendToSelection can be set");
+            Preconditions.checkState(!(sendToRef == null && sendToSelection == null),
+                    "One of sendToRef and sendToSelection must be set");
+
+            return new SliceOptions(this);
+        }
+
+        protected void checkSealed() {
+            Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
+        }
+    }
+}
diff --git a/java/org/opendaylight/controller/cluster/messaging/SlicedMessageState.java b/java/org/opendaylight/controller/cluster/messaging/SlicedMessageState.java
new file mode 100644 (file)
index 0000000..8c3cb51
--- /dev/null
@@ -0,0 +1,239 @@
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains the state of a sliced message.
+ *
+ * @author Thomas Pantelis
+ * @see MessageSlicer
+ */
+@NotThreadSafe
+public class SlicedMessageState<T> implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(SlicedMessageState.class);
+
+    // The index of the first slice that is sent.
+    static final int FIRST_SLICE_INDEX = 1;
+
+    // The initial hash code for a slice.
+    static final int INITIAL_SLICE_HASH_CODE = -1;
+
+    private final Identifier identifier;
+    private final int messageSliceSize;
+    private final FileBackedOutputStream fileBackedStream;
+    private final T replyTarget;
+    private final ByteSource messageBytes;
+    private final int totalSlices;
+    private final long totalMessageSize;
+    private final int maxRetries;
+    private final Consumer<Throwable> onFailureCallback;
+    private final String logContext;
+
+    private int currentByteOffset = 0;
+    private int currentSliceIndex = FIRST_SLICE_INDEX - 1;
+    private int lastSliceHashCode = INITIAL_SLICE_HASH_CODE;
+    private int currentSliceHashCode = INITIAL_SLICE_HASH_CODE;
+    private int tryCount = 1;
+    private InputStream messageInputStream;
+
+    /**
+     * Constructor.
+     *
+     * @param identifier the identifier for this instance
+     * @param fileBackedStream the FileBackedOutputStream containing the serialized data to slice
+     * @param messageSliceSize the maximum size (in bytes) for a message slice
+     * @param maxRetries the maximum number of retries
+     * @param replyTarget the user-defined target for sliced message replies
+     * @param onFailureCallback the callback to notify on failure
+     * @param logContext the context for log messages
+     * @throws IOException if an error occurs opening the input stream
+     */
+    public SlicedMessageState(final Identifier identifier, final FileBackedOutputStream fileBackedStream,
+            final int messageSliceSize, final int maxRetries, final T replyTarget,
+            final Consumer<Throwable> onFailureCallback, final String logContext) throws IOException {
+        this.identifier = identifier;
+        this.fileBackedStream = fileBackedStream;
+        this.messageSliceSize = messageSliceSize;
+        this.maxRetries = maxRetries;
+        this.replyTarget = replyTarget;
+        this.onFailureCallback = onFailureCallback;
+        this.logContext = logContext;
+
+        messageBytes = fileBackedStream.asByteSource();
+        totalMessageSize = messageBytes.size();
+        messageInputStream = messageBytes.openStream();
+
+        totalSlices = (int)(totalMessageSize / messageSliceSize + (totalMessageSize % messageSliceSize > 0 ? 1 : 0));
+
+        LOG.debug("{}: Message size: {} bytes, total slices to send: {}", logContext, totalMessageSize, totalSlices);
+    }
+
+    /**
+     * Returns the current slice index that has been sent.
+     *
+     * @return the current slice index that has been sent
+     */
+    public int getCurrentSliceIndex() {
+        return currentSliceIndex;
+    }
+
+    /**
+     * Returns the hash code of the last slice that was sent.
+     *
+     * @return the hash code of the last slice that was sent
+     */
+    public int getLastSliceHashCode() {
+        return lastSliceHashCode;
+    }
+
+    /**
+     * Returns the total number of slices to send.
+     *
+     * @return the total number of slices to send
+     */
+    public int getTotalSlices() {
+        return totalSlices;
+    }
+
+    /**
+     * Returns the identifier of this instance.
+     *
+     * @return the identifier
+     */
+    public Identifier getIdentifier() {
+        return identifier;
+    }
+
+    /**
+     * Returns the user-defined target for sliced message replies.
+     *
+     * @return the user-defined target
+     */
+    public T getReplyTarget() {
+        return replyTarget;
+    }
+
+    /**
+     *  Returns the callback to notify on failure.
+     *
+     * @return the callback to notify on failure
+     */
+    public Consumer<Throwable> getOnFailureCallback() {
+        return onFailureCallback;
+    }
+
+    /**
+     * Determines if the slicing can be retried.
+     *
+     * @return true if the slicing can be retried, false if the maximum number of retries has been reached
+     */
+    public boolean canRetry() {
+        return tryCount <= maxRetries;
+    }
+
+    /**
+     * Determines if the given index is the last slice to send.
+     *
+     * @param index the slice index to test
+     * @return true if the index is the last slice, false otherwise
+     */
+    public boolean isLastSlice(int index) {
+        return totalSlices == index;
+    }
+
+    /**
+     * Reads and returns the next slice of data.
+     *
+     * @return the next slice of data as a byte[]
+     * @throws IOException if an error occurs reading the data
+     */
+    public byte[] getNextSlice() throws IOException {
+        currentSliceIndex++;
+        final int start;
+        if (currentSliceIndex == FIRST_SLICE_INDEX) {
+            start = 0;
+        } else {
+            start = incrementByteOffset();
+        }
+
+        final int size;
+        if (messageSliceSize > totalMessageSize) {
+            size = (int) totalMessageSize;
+        } else if (start + messageSliceSize > totalMessageSize) {
+            size = (int) (totalMessageSize - start);
+        } else {
+            size = messageSliceSize;
+        }
+
+        LOG.debug("{}: getNextSlice: total size: {}, offset: {}, size: {}, index: {}", logContext, totalMessageSize,
+                start, size, currentSliceIndex);
+
+        byte[] nextSlice = new byte[size];
+        int numRead = messageInputStream.read(nextSlice);
+        if (numRead != size) {
+            throw new IOException(String.format(
+                    "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size));
+        }
+
+        lastSliceHashCode = currentSliceHashCode;
+        currentSliceHashCode = Arrays.hashCode(nextSlice);
+
+        return nextSlice;
+    }
+
+    /**
+     * Resets this instance to restart slicing from the beginning.
+     *
+     * @throws IOException if an error occurs resetting the input stream
+     */
+    public void reset() throws IOException {
+        closeStream();
+
+        tryCount++;
+        currentByteOffset = 0;
+        currentSliceIndex = FIRST_SLICE_INDEX - 1;
+        lastSliceHashCode = INITIAL_SLICE_HASH_CODE;
+        currentSliceHashCode = INITIAL_SLICE_HASH_CODE;
+
+        messageInputStream = messageBytes.openStream();
+    }
+
+    private int incrementByteOffset() {
+        currentByteOffset  += messageSliceSize;
+        return currentByteOffset;
+    }
+
+    private void closeStream() {
+        if (messageInputStream != null) {
+            try {
+                messageInputStream.close();
+            } catch (IOException e) {
+                LOG.warn("{}: Error closing message stream", logContext, e);
+            }
+
+            messageInputStream = null;
+        }
+    }
+
+    @Override
+    public void close() {
+        closeStream();
+        fileBackedStream.cleanup();
+    }
+}