--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.io;
+
+import javax.annotation.Nullable;
+
+/**
+ * A factory for creating {@link FileBackedOutputStream} instances.
+ *
+ * @author Thomas Pantelis
+ * @see FileBackedOutputStream
+ */
+public class FileBackedOutputStreamFactory {
+ private final int fileThreshold;
+ private final String fileDirectory;
+
+ /**
+ * Constructor.
+ *
+ * @param fileThreshold the number of bytes before streams should switch to buffering to a file
+ * @param fileDirectory the directory in which to create files if needed. If null, the default temp file
+ * location is used.
+ */
+ public FileBackedOutputStreamFactory(final int fileThreshold, final @Nullable String fileDirectory) {
+ this.fileThreshold = fileThreshold;
+ this.fileDirectory = fileDirectory;
+ }
+
+ /**
+ * Creates a new {@link FileBackedOutputStream} with the settings configured for this factory.
+ *
+ * @return a {@link FileBackedOutputStream} instance
+ */
+ public FileBackedOutputStream newInstance() {
+ return new FileBackedOutputStream(fileThreshold, fileDirectory);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * Message sent to abort slicing.
+ *
+ * @author Thomas Pantelis
+ */
+class AbortSlicing implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final Identifier identifier;
+
+ AbortSlicing(final Identifier identifier) {
+ this.identifier = Preconditions.checkNotNull(identifier);
+ }
+
+ Identifier getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public String toString() {
+ return "AbortSlicing [identifier=" + identifier + "]";
+ }
+
+ private Object writeReplace() {
+ return new Proxy(this);
+ }
+
+ private static class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private AbortSlicing abortSlicing;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ }
+
+ Proxy(AbortSlicing abortSlicing) {
+ this.abortSlicing = abortSlicing;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(abortSlicing.identifier);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ abortSlicing = new AbortSlicing((Identifier) in.readObject());
+ }
+
+ private Object readResolve() {
+ return abortSlicing;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains the state of an assembled message.
+ *
+ * @author Thomas Pantelis
+ */
+@NotThreadSafe
+public class AssembledMessageState implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(AssembledMessageState.class);
+
+ private final int totalSlices;
+ private final BufferedOutputStream bufferedStream;
+ private final FileBackedOutputStream fileBackedStream;
+ private final Identifier identifier;
+ private final String logContext;
+
+ private int lastSliceIndexReceived = SlicedMessageState.FIRST_SLICE_INDEX - 1;
+ private int lastSliceHashCodeReceived = SlicedMessageState.INITIAL_SLICE_HASH_CODE;
+ private boolean sealed = false;
+ private boolean closed = false;
+ private long assembledSize;
+
+ /**
+ * Constructor.
+ *
+ * @param identifier the identifier for this instance
+ * @param totalSlices the total number of slices to expect
+ * @param fileBackedStreamFactory factory for creating the FileBackedOutputStream instance used for streaming
+ * @param logContext the context for log messages
+ */
+ public AssembledMessageState(final Identifier identifier, final int totalSlices,
+ final FileBackedOutputStreamFactory fileBackedStreamFactory, final String logContext) {
+ this.identifier = identifier;
+ this.totalSlices = totalSlices;
+ this.logContext = logContext;
+
+ fileBackedStream = fileBackedStreamFactory.newInstance();
+ bufferedStream = new BufferedOutputStream(fileBackedStream);
+ }
+
+ /**
+ * Returns the identifier of this instance.
+ *
+ * @return the identifier
+ */
+ public Identifier getIdentifier() {
+ return identifier;
+ }
+
+ /**
+ * Adds a slice to the assembled stream.
+ *
+ * @param sliceIndex the index of the slice
+ * @param data the sliced data
+ * @param lastSliceHashCode the hash code of the last slice sent
+ * @return true if this is the last slice received, false otherwise
+ * @throws MessageSliceException
+ * <ul>
+ * <li>if the slice index is invalid</li>
+ * <li>if the last slice hash code is invalid</li>
+ * <li>if an error occurs writing the data to the stream</li>
+ * </ul>
+ * In addition, this instance is automatically closed and can no longer be used.
+ * @throws AssemblerSealedException if this instance is already sealed (ie has received all the slices)
+ * @throws AssemblerClosedException if this instance is already closed
+ */
+ public boolean addSlice(final int sliceIndex, final byte[] data, final int lastSliceHashCode)
+ throws MessageSliceException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: addSlice: identifier: {}, sliceIndex: {}, lastSliceIndex: {}, assembledSize: {}, "
+ + "sliceHashCode: {}, lastSliceHashCode: {}", logContext, identifier, sliceIndex,
+ lastSliceIndexReceived, assembledSize, lastSliceHashCode, lastSliceHashCodeReceived);
+ }
+
+ try {
+ validateSlice(sliceIndex, lastSliceHashCode);
+
+ assembledSize += data.length;
+ lastSliceIndexReceived = sliceIndex;
+ lastSliceHashCodeReceived = Arrays.hashCode(data);
+
+ bufferedStream.write(data);
+
+ sealed = sliceIndex == totalSlices;
+ if (sealed) {
+ bufferedStream.close();
+ }
+ } catch (IOException e) {
+ close();
+ throw new MessageSliceException(String.format("Error writing data for slice %d of message %s",
+ sliceIndex, identifier), e);
+ }
+
+ return sealed;
+ }
+
+ /**
+ * Returns the assembled bytes as a ByteSource. This method must only be called after this instance is sealed.
+ *
+ * @return a ByteSource containing the assembled bytes
+ * @throws IOException if an error occurs obtaining the assembled bytes
+ * @throws IllegalStateException is this instance is not sealed
+ */
+ public ByteSource getAssembledBytes() throws IOException {
+ Preconditions.checkState(sealed, "Last slice not received yet");
+ return fileBackedStream.asByteSource();
+ }
+
+ private void validateSlice(final int sliceIndex, final int lastSliceHashCode) throws MessageSliceException {
+ if (closed) {
+ throw new AssemblerClosedException(identifier);
+ }
+
+ if (sealed) {
+ throw new AssemblerSealedException(String.format(
+ "Received slice index for message %s but all %d expected slices have already already received.",
+ identifier, totalSlices));
+ }
+
+ if (lastSliceIndexReceived + 1 != sliceIndex) {
+ close();
+ throw new MessageSliceException(String.format("Expected sliceIndex %d but got %d for message %s",
+ lastSliceIndexReceived + 1, sliceIndex, identifier), true);
+ }
+
+ if (lastSliceHashCode != lastSliceHashCodeReceived) {
+ close();
+ throw new MessageSliceException(String.format("The hash code of the recorded last slice (%d) does not "
+ + "match the senders last hash code (%d) for message %s", lastSliceHashCodeReceived,
+ lastSliceHashCode, identifier), true);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+
+ closed = true;
+ if (!sealed) {
+ try {
+ bufferedStream.close();
+ } catch (IOException e) {
+ LOG.debug("{}: Error closing output stream", logContext, e);
+ }
+ }
+
+ fileBackedStream.cleanup();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * A MessageSliceException indicating the message assembler has already been closed.
+ *
+ * @author Thomas Pantelis
+ */
+public class AssemblerClosedException extends MessageSliceException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs an instance.
+ *
+ * @param identifier the identifier whose state was closed
+ */
+ public AssemblerClosedException(final Identifier identifier) {
+ super(String.format("Message assembler for %s has already been closed", identifier), false);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+/**
+ * A MessageSliceException indicating the message assembler has already been sealed.
+ *
+ * @author Thomas Pantelis
+ */
+public class AssemblerSealedException extends MessageSliceException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs an instance.
+ *
+ * @param message he detail message
+ */
+ public AssemblerSealedException(String message) {
+ super(message, false);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class re-assembles messages sliced into smaller chunks by {@link MessageSlicer}.
+ *
+ * @author Thomas Pantelis
+ * @see MessageSlicer
+ */
+public class MessageAssembler implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(MessageAssembler.class);
+
+ private final Cache<Identifier, AssembledMessageState> stateCache;
+ private final FileBackedOutputStreamFactory filedBackedStreamFactory;
+ private final BiConsumer<Object, ActorRef> assembledMessageCallback;
+ private final String logContext;
+
+ private MessageAssembler(Builder builder) {
+ this.filedBackedStreamFactory = Preconditions.checkNotNull(builder.filedBackedStreamFactory,
+ "FiledBackedStreamFactory cannot be null");
+ this.assembledMessageCallback = Preconditions.checkNotNull(builder.assembledMessageCallback,
+ "assembledMessageCallback cannot be null");
+ this.logContext = builder.logContext;
+
+ stateCache = CacheBuilder.newBuilder()
+ .expireAfterAccess(builder.expireStateAfterInactivityDuration, builder.expireStateAfterInactivityUnit)
+ .removalListener((RemovalListener<Identifier, AssembledMessageState>) notification ->
+ stateRemoved(notification)).build();
+ }
+
+ /**
+ * Returns a new Builder for creating MessageAssembler instances.
+ *
+ * @return a Builder instance
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Checks if the given message is handled by this class. If so, it should be forwarded to the
+ * {@link #handleMessage(Object, ActorRef)} method
+ *
+ * @param message the message to check
+ * @return true if handled, false otherwise
+ */
+ public static boolean isHandledMessage(Object message) {
+ return message instanceof MessageSlice || message instanceof AbortSlicing;
+ }
+
+ @Override
+ public void close() {
+ LOG.debug("{}: Closing", logContext);
+ stateCache.invalidateAll();
+ }
+
+ /**
+ * Checks for and removes assembled message state that has expired due to inactivity from the slicing component
+ * on the other end.
+ */
+ public void checkExpiredAssembledMessageState() {
+ if (stateCache.size() > 0) {
+ stateCache.cleanUp();
+ }
+ }
+
+ /**
+ * Invoked to handle message slices and other messages pertaining to this class.
+ *
+ * @param message the message
+ * @param sendTo the reference of the actor to which subsequent message slices should be sent
+ * @return true if the message was handled, false otherwise
+ */
+ public boolean handleMessage(final Object message, final @Nonnull ActorRef sendTo) {
+ if (message instanceof MessageSlice) {
+ LOG.debug("{}: handleMessage: {}", logContext, message);
+ onMessageSlice((MessageSlice) message, sendTo);
+ return true;
+ } else if (message instanceof AbortSlicing) {
+ LOG.debug("{}: handleMessage: {}", logContext, message);
+ onAbortSlicing((AbortSlicing) message);
+ return true;
+ }
+
+ return false;
+ }
+
+ private void onMessageSlice(final MessageSlice messageSlice, final ActorRef sendTo) {
+ final Identifier identifier = messageSlice.getIdentifier();
+ try {
+ final AssembledMessageState state = stateCache.get(identifier, () -> createState(messageSlice));
+ processMessageSliceForState(messageSlice, state, sendTo);
+ } catch (ExecutionException e) {
+ final MessageSliceException messageSliceEx;
+ final Throwable cause = e.getCause();
+ if (cause instanceof MessageSliceException) {
+ messageSliceEx = (MessageSliceException) cause;
+ } else {
+ messageSliceEx = new MessageSliceException(String.format(
+ "Error creating state for identifier %s", identifier), cause);
+ }
+
+ messageSlice.getReplyTo().tell(MessageSliceReply.failed(identifier, messageSliceEx, sendTo),
+ ActorRef.noSender());
+ }
+ }
+
+ private AssembledMessageState createState(final MessageSlice messageSlice) throws MessageSliceException {
+ final Identifier identifier = messageSlice.getIdentifier();
+ if (messageSlice.getSliceIndex() == SlicedMessageState.FIRST_SLICE_INDEX) {
+ LOG.debug("{}: Received first slice for {} - creating AssembledMessageState", logContext, identifier);
+ return new AssembledMessageState(identifier, messageSlice.getTotalSlices(),
+ filedBackedStreamFactory, logContext);
+ }
+
+ LOG.debug("{}: AssembledMessageState not found for {} - returning failed reply", logContext, identifier);
+ throw new MessageSliceException(String.format(
+ "No assembled state found for identifier %s and slice index %s", identifier,
+ messageSlice.getSliceIndex()), true);
+ }
+
+ private void processMessageSliceForState(final MessageSlice messageSlice, AssembledMessageState state,
+ final ActorRef sendTo) {
+ final Identifier identifier = messageSlice.getIdentifier();
+ final ActorRef replyTo = messageSlice.getReplyTo();
+ Object reAssembledMessage = null;
+ synchronized (state) {
+ final int sliceIndex = messageSlice.getSliceIndex();
+ try {
+ final MessageSliceReply successReply = MessageSliceReply.success(identifier, sliceIndex, sendTo);
+ if (state.addSlice(sliceIndex, messageSlice.getData(), messageSlice.getLastSliceHashCode())) {
+ LOG.debug("{}: Received last slice for {}", logContext, identifier);
+
+ reAssembledMessage = reAssembleMessage(state);
+
+ replyTo.tell(successReply, ActorRef.noSender());
+ removeState(identifier);
+ } else {
+ LOG.debug("{}: Added slice for {} - expecting more", logContext, identifier);
+ replyTo.tell(successReply, ActorRef.noSender());
+ }
+ } catch (MessageSliceException e) {
+ LOG.warn("{}: Error processing {}", logContext, messageSlice, e);
+ replyTo.tell(MessageSliceReply.failed(identifier, e, sendTo), ActorRef.noSender());
+ removeState(identifier);
+ }
+ }
+
+ if (reAssembledMessage != null) {
+ LOG.debug("{}: Notifying callback of re-assembled message {}", logContext, reAssembledMessage);
+ assembledMessageCallback.accept(reAssembledMessage, replyTo);
+ }
+ }
+
+ private Object reAssembleMessage(final AssembledMessageState state) throws MessageSliceException {
+ try {
+ final ByteSource assembledBytes = state.getAssembledBytes();
+ try (ObjectInputStream in = new ObjectInputStream(assembledBytes.openStream())) {
+ return in.readObject();
+ }
+
+ } catch (IOException | ClassNotFoundException e) {
+ throw new MessageSliceException(String.format("Error re-assembling bytes for identifier %s",
+ state.getIdentifier()), e);
+ }
+ }
+
+ private void onAbortSlicing(AbortSlicing message) {
+ removeState(message.getIdentifier());
+ }
+
+ private void removeState(final Identifier identifier) {
+ LOG.debug("{}: Removing state for {}", logContext, identifier);
+ stateCache.invalidate(identifier);
+ }
+
+ private void stateRemoved(RemovalNotification<Identifier, AssembledMessageState> notification) {
+ if (notification.wasEvicted()) {
+ LOG.warn("{}: AssembledMessageState for {} was expired from the cache", logContext, notification.getKey());
+ } else {
+ LOG.debug("{}: AssembledMessageState for {} was removed from the cache due to {}", logContext,
+ notification.getKey(), notification.getCause());
+ }
+
+ notification.getValue().close();
+ }
+
+ @VisibleForTesting
+ boolean hasState(Identifier forIdentifier) {
+ boolean exists = stateCache.getIfPresent(forIdentifier) != null;
+ stateCache.cleanUp();
+ return exists;
+ }
+
+ public static class Builder {
+ private FileBackedOutputStreamFactory filedBackedStreamFactory;
+ private BiConsumer<Object, ActorRef> assembledMessageCallback;
+ private long expireStateAfterInactivityDuration = 1;
+ private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
+ private String logContext = "<no-context>";
+
+ /**
+ * Sets the factory for creating FileBackedOutputStream instances used for streaming messages.
+ *
+ * @param newFiledBackedStreamFactory the factory for creating FileBackedOutputStream instances
+ * @return this Builder
+ */
+ public Builder filedBackedStreamFactory(final FileBackedOutputStreamFactory newFiledBackedStreamFactory) {
+ this.filedBackedStreamFactory = Preconditions.checkNotNull(newFiledBackedStreamFactory);
+ return this;
+ }
+
+ /**
+ * Sets the Consumer callback for assembled messages. The callback takes the assembled message and the
+ * original sender ActorRef as arguments.
+ *
+ * @param newAssembledMessageCallback the Consumer callback
+ * @return this Builder
+ */
+ public Builder assembledMessageCallback(final BiConsumer<Object, ActorRef> newAssembledMessageCallback) {
+ this.assembledMessageCallback = newAssembledMessageCallback;
+ return this;
+ }
+
+ /**
+ * Sets the duration and time unit whereby assembled message state is purged from the cache due to
+ * inactivity from the slicing component on the other end. By default, state is purged after 1 minute of
+ * inactivity.
+ *
+ * @param duration the length of time after which a state entry is purged
+ * @param unit the unit the duration is expressed in
+ * @return this Builder
+ */
+ public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
+ Preconditions.checkArgument(duration > 0, "duration must be > 0");
+ this.expireStateAfterInactivityDuration = duration;
+ this.expireStateAfterInactivityUnit = unit;
+ return this;
+ }
+
+ /**
+ * Sets the context for log messages.
+ *
+ * @param newLogContext the log context
+ * @return this Builder
+ */
+ public Builder logContext(final String newLogContext) {
+ this.logContext = newLogContext;
+ return this;
+ }
+
+ /**
+ * Builds a new MessageAssembler instance.
+ *
+ * @return a new MessageAssembler
+ */
+ public MessageAssembler build() {
+ return new MessageAssembler(this);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import akka.actor.ActorRef;
+import akka.serialization.JavaSerializer;
+import akka.serialization.Serialization;
+import com.google.common.base.Preconditions;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * Represents a sliced message chunk.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSlice implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final Identifier identifier;
+ private final byte[] data;
+ private final int sliceIndex;
+ private final int totalSlices;
+ private final int lastSliceHashCode;
+ private final ActorRef replyTo;
+
+ public MessageSlice(Identifier identifier, byte[] data, int sliceIndex, int totalSlices, int lastSliceHashCode,
+ final ActorRef replyTo) {
+ this.identifier = Preconditions.checkNotNull(identifier);
+ this.data = Preconditions.checkNotNull(data);
+ this.sliceIndex = sliceIndex;
+ this.totalSlices = totalSlices;
+ this.lastSliceHashCode = lastSliceHashCode;
+ this.replyTo = Preconditions.checkNotNull(replyTo);
+ }
+
+ public Identifier getIdentifier() {
+ return identifier;
+ }
+
+ @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "Exposes a mutable object stored in a field but "
+ + "this is OK since this class is merely a DTO and does not process the byte[] internally."
+ + "Also it would be inefficient to create a return copy as the byte[] could be large.")
+ public byte[] getData() {
+ return data;
+ }
+
+ public int getSliceIndex() {
+ return sliceIndex;
+ }
+
+ public int getTotalSlices() {
+ return totalSlices;
+ }
+
+ public int getLastSliceHashCode() {
+ return lastSliceHashCode;
+ }
+
+ public ActorRef getReplyTo() {
+ return replyTo;
+ }
+
+ @Override
+ public String toString() {
+ return "MessageSlice [identifier=" + identifier + ", data.length=" + data.length + ", sliceIndex="
+ + sliceIndex + ", totalSlices=" + totalSlices + ", lastSliceHashCode=" + lastSliceHashCode
+ + ", replyTo=" + replyTo + "]";
+ }
+
+ private Object writeReplace() {
+ return new Proxy(this);
+ }
+
+ private static class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private MessageSlice messageSlice;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ }
+
+ Proxy(MessageSlice messageSlice) {
+ this.messageSlice = messageSlice;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(messageSlice.identifier);
+ out.writeInt(messageSlice.sliceIndex);
+ out.writeInt(messageSlice.totalSlices);
+ out.writeInt(messageSlice.lastSliceHashCode);
+ out.writeObject(messageSlice.data);
+ out.writeObject(Serialization.serializedActorPath(messageSlice.replyTo));
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ Identifier identifier = (Identifier) in.readObject();
+ int sliceIndex = in.readInt();
+ int totalSlices = in.readInt();
+ int lastSliceHashCode = in.readInt();
+ byte[] data = (byte[])in.readObject();
+ ActorRef replyTo = JavaSerializer.currentSystem().value().provider()
+ .resolveActorRef((String) in.readObject());
+
+ messageSlice = new MessageSlice(identifier, data, sliceIndex, totalSlices, lastSliceHashCode, replyTo);
+ }
+
+ private Object readResolve() {
+ return messageSlice;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+/**
+ * An exception indicating a message slice failure.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSliceException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ private final boolean isRetriable;
+
+ /**
+ * Constructs an instance.
+ *
+ * @param message the detail message
+ * @param cause the cause
+ */
+ public MessageSliceException(final String message, final Throwable cause) {
+ super(message, cause);
+ isRetriable = false;
+ }
+
+ /**
+ * Constructs an instance.
+ *
+ * @param message the detail message
+ * @param isRetriable if true, indicates the original operation can be retried
+ */
+ public MessageSliceException(final String message, final boolean isRetriable) {
+ super(message);
+ this.isRetriable = isRetriable;
+ }
+
+ /**
+ * Returns whether or not the original operation can be retried.
+ *
+ * @return true if it can be retried, false otherwise
+ */
+ public boolean isRetriable() {
+ return isRetriable;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * Identifier for a message slice that is composed of a client-supplied Identifier and an internal counter value.
+ *
+ * @author Thomas Pantelis
+ */
+final class MessageSliceIdentifier implements Identifier {
+ private static final long serialVersionUID = 1L;
+ private static final AtomicLong ID_COUNTER = new AtomicLong(1);
+
+ private final Identifier clientIdentifier;
+ private final long messageId;
+
+ MessageSliceIdentifier(final Identifier clientIdentifier) {
+ this(clientIdentifier, ID_COUNTER.getAndIncrement());
+ }
+
+ private MessageSliceIdentifier(final Identifier clientIdentifier, final long messageId) {
+ this.clientIdentifier = Preconditions.checkNotNull(clientIdentifier);
+ this.messageId = messageId;
+ }
+
+ Identifier getClientIdentifier() {
+ return clientIdentifier;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + clientIdentifier.hashCode();
+ result = prime * result + (int) (messageId ^ messageId >>> 32);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (!(obj instanceof MessageSliceIdentifier)) {
+ return false;
+ }
+
+ MessageSliceIdentifier other = (MessageSliceIdentifier) obj;
+ return other.clientIdentifier.equals(clientIdentifier) && other.messageId == messageId;
+ }
+
+ @Override
+ public String toString() {
+ return "MessageSliceIdentifier [clientIdentifier=" + clientIdentifier + ", messageId=" + messageId + "]";
+ }
+
+ private Object writeReplace() {
+ return new Proxy(this);
+ }
+
+ private static class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private MessageSliceIdentifier messageSliceId;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ }
+
+ Proxy(MessageSliceIdentifier messageSliceId) {
+ this.messageSliceId = messageSliceId;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(messageSliceId.clientIdentifier);
+ out.writeLong(messageSliceId.messageId);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ messageSliceId = new MessageSliceIdentifier((Identifier) in.readObject(), in.readLong());
+ }
+
+ private Object readResolve() {
+ return messageSliceId;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import akka.actor.ActorRef;
+import akka.serialization.JavaSerializer;
+import akka.serialization.Serialization;
+import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.util.Optional;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * The reply message for {@link MessageSlice}.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSliceReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final Identifier identifier;
+ private final int sliceIndex;
+ private final MessageSliceException failure;
+ private final ActorRef sendTo;
+
+ private MessageSliceReply(final Identifier identifier, final int sliceIndex, final MessageSliceException failure,
+ final ActorRef sendTo) {
+ this.identifier = Preconditions.checkNotNull(identifier);
+ this.sliceIndex = sliceIndex;
+ this.sendTo = Preconditions.checkNotNull(sendTo);
+ this.failure = failure;
+ }
+
+ public static MessageSliceReply success(final Identifier identifier, final int sliceIndex, final ActorRef sendTo) {
+ return new MessageSliceReply(identifier, sliceIndex, null, sendTo);
+ }
+
+ public static MessageSliceReply failed(final Identifier identifier, final MessageSliceException failure,
+ final ActorRef sendTo) {
+ return new MessageSliceReply(identifier, -1, failure, sendTo);
+ }
+
+ public Identifier getIdentifier() {
+ return identifier;
+ }
+
+ public int getSliceIndex() {
+ return sliceIndex;
+ }
+
+ public ActorRef getSendTo() {
+ return sendTo;
+ }
+
+ public Optional<MessageSliceException> getFailure() {
+ return Optional.ofNullable(failure);
+ }
+
+ @Override
+ public String toString() {
+ return "MessageSliceReply [identifier=" + identifier + ", sliceIndex=" + sliceIndex + ", failure=" + failure
+ + ", sendTo=" + sendTo + "]";
+ }
+
+ private Object writeReplace() {
+ return new Proxy(this);
+ }
+
+ private static class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private MessageSliceReply messageSliceReply;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ }
+
+ Proxy(MessageSliceReply messageSliceReply) {
+ this.messageSliceReply = messageSliceReply;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(messageSliceReply.identifier);
+ out.writeInt(messageSliceReply.sliceIndex);
+ out.writeObject(messageSliceReply.failure);
+ out.writeObject(Serialization.serializedActorPath(messageSliceReply.sendTo));
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ final Identifier identifier = (Identifier) in.readObject();
+ final int sliceIndex = in.readInt();
+ final MessageSliceException failure = (MessageSliceException) in.readObject();
+ ActorRef sendTo = JavaSerializer.currentSystem().value().provider()
+ .resolveActorRef((String) in.readObject());
+
+ messageSliceReply = new MessageSliceReply(identifier, sliceIndex, failure, sendTo);
+ }
+
+ private Object readResolve() {
+ return messageSliceReply;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class slices messages into smaller chunks. {@link MessageAssembler} is used to re-assemble the messages.
+ *
+ * @author Thomas Pantelis
+ * @see MessageAssembler
+ */
+public class MessageSlicer implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
+ public static final int DEFAULT_MAX_SLICING_TRIES = 3;
+
+ private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
+ private final FileBackedOutputStreamFactory filedBackedStreamFactory;
+ private final int messageSliceSize;
+ private final int maxSlicingTries;
+ private final String logContext;
+
+ private MessageSlicer(Builder builder) {
+ this.filedBackedStreamFactory = builder.filedBackedStreamFactory;
+ this.messageSliceSize = builder.messageSliceSize;
+ this.maxSlicingTries = builder.maxSlicingTries;
+ this.logContext = builder.logContext;
+
+ CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
+ (RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
+ if (builder.expireStateAfterInactivityDuration > 0) {
+ cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration,
+ builder.expireStateAfterInactivityUnit);
+ }
+
+ stateCache = cacheBuilder.build();
+ }
+
+ /**
+ * Returns a new Builder for creating MessageSlicer instances.
+ *
+ * @return a Builder instance
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Checks if the given message is handled by this class. If so, it should be forwarded to the
+ * {@link #handleMessage(Object)} method
+ *
+ * @param message the message to check
+ * @return true if handled, false otherwise
+ */
+ public static boolean isHandledMessage(Object message) {
+ return message instanceof MessageSliceReply;
+ }
+
+ /**
+ * Slices a message into chunks based on the serialized size, the maximum message slice size and the given
+ * options.
+ *
+ * @param options the SliceOptions
+ */
+ public void slice(SliceOptions options) {
+ final Identifier identifier = options.getIdentifier();
+ final Serializable message = options.getMessage();
+ final FileBackedOutputStream fileBackedStream;
+ if (message != null) {
+ LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
+
+
+ Preconditions.checkNotNull(filedBackedStreamFactory,
+ "The FiledBackedStreamFactory must be set in order to call this slice method");
+
+ // Serialize the message to a FileBackedOutputStream.
+ fileBackedStream = filedBackedStreamFactory.newInstance();
+ try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
+ out.writeObject(message);
+ } catch (IOException e) {
+ LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
+ fileBackedStream.cleanup();
+ options.getOnFailureCallback().accept(e);
+ return;
+ }
+ } else {
+ fileBackedStream = options.getFileBackedStream();
+ }
+
+ initializeSlicing(options, fileBackedStream);
+ }
+
+ private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
+ final Identifier identifier = options.getIdentifier();
+ MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier);
+ SlicedMessageState<ActorRef> state = null;
+ try {
+ state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
+ options.getReplyTo(), options.getOnFailureCallback(), logContext);
+
+ final Serializable message = options.getMessage();
+ if (state.getTotalSlices() == 1 && message != null) {
+ LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
+ state.close();
+ sendTo(options, message, options.getReplyTo());
+ return;
+ }
+
+ final MessageSlice firstSlice = getNextSliceMessage(state);
+
+ LOG.debug("{}: Sending first slice: {}", logContext, firstSlice);
+
+ stateCache.put(messageSliceId, state);
+ sendTo(options, firstSlice, ActorRef.noSender());
+ } catch (IOException e) {
+ LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
+ if (state != null) {
+ state.close();
+ } else {
+ fileBackedStream.cleanup();
+ }
+
+ options.getOnFailureCallback().accept(e);
+ }
+ }
+
+ private void sendTo(SliceOptions options, Object message, ActorRef sender) {
+ if (options.getSendToRef() != null) {
+ options.getSendToRef().tell(message, sender);
+ } else {
+ options.getSendToSelection().tell(message, sender);
+ }
+ }
+
+ /**
+ * Invoked to handle messages pertaining to this class.
+ *
+ * @param message the message
+ * @return true if the message was handled, false otherwise
+ */
+ public boolean handleMessage(final Object message) {
+ if (message instanceof MessageSliceReply) {
+ LOG.debug("{}: handleMessage: {}", logContext, message);
+ onMessageSliceReply((MessageSliceReply) message);
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Checks for and removes sliced message state that has expired due to inactivity from the assembling component
+ * on the other end.
+ */
+ public void checkExpiredSlicedMessageState() {
+ if (stateCache.size() > 0) {
+ stateCache.cleanUp();
+ }
+ }
+
+ /**
+ * Closes and removes all in-progress sliced message state.
+ */
+ @Override
+ public void close() {
+ LOG.debug("{}: Closing", logContext);
+ stateCache.invalidateAll();
+ }
+
+ private MessageSlice getNextSliceMessage(SlicedMessageState<ActorRef> state) throws IOException {
+ final byte[] firstSliceBytes = state.getNextSlice();
+ return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
+ state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
+ }
+
+ private void onMessageSliceReply(final MessageSliceReply reply) {
+ final Identifier identifier = reply.getIdentifier();
+ final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
+ if (state == null) {
+ LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
+ reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
+ return;
+ }
+
+ synchronized (state) {
+ try {
+ final Optional<MessageSliceException> failure = reply.getFailure();
+ if (failure.isPresent()) {
+ LOG.warn("{}: Received failed {}", logContext, reply);
+ processMessageSliceException(failure.get(), state, reply.getSendTo());
+ return;
+ }
+
+ if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
+ LOG.warn("{}: Slice index {} in {} does not match expected index {}", logContext,
+ reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
+ reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
+ possiblyRetrySlicing(state, reply.getSendTo());
+ return;
+ }
+
+ if (state.isLastSlice(reply.getSliceIndex())) {
+ LOG.debug("{}: Received last slice reply for {}", logContext, identifier);
+ removeState(identifier);
+ } else {
+ final MessageSlice nextSlice = getNextSliceMessage(state);
+ LOG.debug("{}: Sending next slice: {}", logContext, nextSlice);
+ reply.getSendTo().tell(nextSlice, ActorRef.noSender());
+ }
+ } catch (IOException e) {
+ LOG.warn("{}: Error processing {}", logContext, reply, e);
+ fail(state, e);
+ }
+ }
+ }
+
+ private void processMessageSliceException(final MessageSliceException exception,
+ final SlicedMessageState<ActorRef> state, final ActorRef sendTo) throws IOException {
+ if (exception.isRetriable()) {
+ possiblyRetrySlicing(state, sendTo);
+ } else {
+ fail(state, exception.getCause() != null ? exception.getCause() : exception);
+ }
+ }
+
+ private void possiblyRetrySlicing(final SlicedMessageState<ActorRef> state, final ActorRef sendTo)
+ throws IOException {
+ if (state.canRetry()) {
+ LOG.info("{}: Retrying message slicing for {}", logContext, state.getIdentifier());
+ state.reset();
+ sendTo.tell(getNextSliceMessage(state), ActorRef.noSender());
+ } else {
+ String message = String.format("Maximum slicing retries reached for identifier %s - failing the message",
+ state.getIdentifier());
+ LOG.warn(message);
+ fail(state, new RuntimeException(message));
+ }
+ }
+
+ private void removeState(final Identifier identifier) {
+ LOG.debug("{}: Removing state for {}", logContext, identifier);
+ stateCache.invalidate(identifier);
+ }
+
+ private void stateRemoved(RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
+ final SlicedMessageState<ActorRef> state = notification.getValue();
+ state.close();
+ if (notification.wasEvicted()) {
+ LOG.warn("{}: SlicedMessageState for {} was expired from the cache", logContext, notification.getKey());
+ state.getOnFailureCallback().accept(new RuntimeException(String.format(
+ "The slicing state for message identifier %s was expired due to inactivity from the assembling "
+ + "component on the other end", state.getIdentifier())));
+ } else {
+ LOG.debug("{}: SlicedMessageState for {} was removed from the cache due to {}", logContext,
+ notification.getKey(), notification.getCause());
+ }
+ }
+
+ private void fail(final SlicedMessageState<ActorRef> state, final Throwable failure) {
+ removeState(state.getIdentifier());
+ state.getOnFailureCallback().accept(failure);
+ }
+
+ @VisibleForTesting
+ boolean hasState(Identifier forIdentifier) {
+ boolean exists = stateCache.getIfPresent(forIdentifier) != null;
+ stateCache.cleanUp();
+ return exists;
+ }
+
+ public static class Builder {
+ private FileBackedOutputStreamFactory filedBackedStreamFactory;
+ private int messageSliceSize = -1;
+ private long expireStateAfterInactivityDuration = -1;
+ private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
+ private int maxSlicingTries = DEFAULT_MAX_SLICING_TRIES;
+ private String logContext = "<no-context>";
+
+ /**
+ * Sets the factory for creating FileBackedOutputStream instances used for streaming messages. This factory
+ * is used by the {@link MessageSlicer#slice(SliceOptions)} method if a Serializable message is passed.
+ * If Serializable messages aren't passed then the factory need not be set.
+ *
+ * @param newFiledBackedStreamFactory the factory for creating FileBackedOutputStream instances
+ * @return this Builder
+ */
+ public Builder filedBackedStreamFactory(final FileBackedOutputStreamFactory newFiledBackedStreamFactory) {
+ this.filedBackedStreamFactory = Preconditions.checkNotNull(newFiledBackedStreamFactory);
+ return this;
+ }
+
+ /**
+ * Sets the maximum size (in bytes) for a message slice.
+ *
+ * @param newMessageSliceSize the maximum size (in bytes)
+ * @return this Builder
+ */
+ public Builder messageSliceSize(final int newMessageSliceSize) {
+ Preconditions.checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0");
+ this.messageSliceSize = newMessageSliceSize;
+ return this;
+ }
+
+ /**
+ * Sets the maximum number of tries for slicing a message. If exceeded, slicing fails. The default is
+ * defined by {@link #DEFAULT_MAX_SLICING_TRIES}
+ *
+ * @param newMaxSlicingTries the maximum number of tries
+ * @return this Builder
+ */
+ public Builder maxSlicingTries(final int newMaxSlicingTries) {
+ Preconditions.checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0");
+ this.maxSlicingTries = newMaxSlicingTries;
+ return this;
+ }
+
+ /**
+ * Sets the duration and time unit whereby sliced message state is purged from the cache and the associated
+ * failure callback is notified due to inactivity from the assembling component on the other end. By default,
+ * state is not purged due to inactivity.
+ *
+ * @param duration the length of time after which a state entry is purged
+ * @param unit the unit the duration is expressed in
+ * @return this Builder
+ */
+ public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
+ Preconditions.checkArgument(duration > 0, "duration must be > 0");
+ this.expireStateAfterInactivityDuration = duration;
+ this.expireStateAfterInactivityUnit = unit;
+ return this;
+ }
+
+ /**
+ * Sets the context for log messages.
+ *
+ * @param newLogContext the log context
+ * @return this Builder
+ */
+ public Builder logContext(final String newLogContext) {
+ this.logContext = Preconditions.checkNotNull(newLogContext);
+ return this;
+ }
+
+ /**
+ * Builds a new MessageSlicer instance.
+ *
+ * @return a new MessageSlicer
+ */
+ public MessageSlicer build() {
+ return new MessageSlicer(this);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * Options for slicing a message with {@link MessageSlicer#slice(SliceOptions)}.
+ *
+ * @author Thomas Pantelis
+ */
+public class SliceOptions {
+ private final Builder builder;
+
+ private SliceOptions(Builder builder) {
+ this.builder = builder;
+ }
+
+ public Identifier getIdentifier() {
+ return builder.identifier;
+ }
+
+ public FileBackedOutputStream getFileBackedStream() {
+ return builder.fileBackedStream;
+ }
+
+ public Serializable getMessage() {
+ return builder.message;
+ }
+
+ public ActorRef getSendToRef() {
+ return builder.sendToRef;
+ }
+
+ public ActorSelection getSendToSelection() {
+ return builder.sendToSelection;
+ }
+
+ public ActorRef getReplyTo() {
+ return builder.replyTo;
+ }
+
+ public Consumer<Throwable> getOnFailureCallback() {
+ return builder.onFailureCallback;
+ }
+
+ /**
+ * Returns a new Builder for creating MessageSlicer instances.
+ *
+ * @return a Builder instance
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private Identifier identifier;
+ private FileBackedOutputStream fileBackedStream;
+ private Serializable message;
+ private ActorRef sendToRef;
+ private ActorSelection sendToSelection;
+ private ActorRef replyTo;
+ private Consumer<Throwable> onFailureCallback;
+ private boolean sealed;
+
+ /**
+ * Sets the identifier of the component to slice.
+ *
+ * @param newIdentifier the identifier
+ * @return this Builder
+ */
+ public Builder identifier(final Identifier newIdentifier) {
+ checkSealed();
+ identifier = newIdentifier;
+ return this;
+ }
+
+ /**
+ * Sets the {@link FileBackedOutputStream} containing the message data to slice.
+ *
+ * @param newFileBackedStream the {@link FileBackedOutputStream}
+ * @return this Builder
+ */
+ public Builder fileBackedOutputStream(final FileBackedOutputStream newFileBackedStream) {
+ checkSealed();
+ fileBackedStream = newFileBackedStream;
+ return this;
+ }
+
+ /**
+ * Sets the message to slice. The message is first serialized to a {@link FileBackedOutputStream}. If the
+ * message doesn't need to be sliced, ie its serialized size is less than the maximum message slice size, then
+ * the original message is sent. Otherwise the first message slice is sent.
+ *
+ * <p>
+ * <b>Note:</b> a {@link FileBackedOutputStreamFactory} must be set in the {@link MessageSlicer}.
+ *
+ * @param newMessage the message
+ * @param <T> the Serializable message type
+ * @return this Builder
+ */
+ public <T extends Serializable> Builder message(final T newMessage) {
+ checkSealed();
+ message = newMessage;
+ return this;
+ }
+
+ /**
+ * Sets the reference of the actor to which to send the message slices.
+ *
+ * @param sendTo the ActorRef
+ * @return this Builder
+ */
+ public Builder sendTo(final ActorRef sendTo) {
+ checkSealed();
+ sendToRef = sendTo;
+ return this;
+ }
+
+ /**
+ * Sets the ActorSelection to which to send the message slices.
+ *
+ * @param sendTo the ActorSelection
+ * @return this Builder
+ */
+ public Builder sendTo(final ActorSelection sendTo) {
+ checkSealed();
+ sendToSelection = sendTo;
+ return this;
+ }
+
+ /**
+ * Sets the reference of the actor to which message slice replies should be sent. The actor should
+ * forward the replies to the {@link MessageSlicer#handleMessage(Object)} method.
+ *
+ * @param newReplyTo the ActorRef
+ * @return this Builder
+ */
+ public Builder replyTo(final ActorRef newReplyTo) {
+ checkSealed();
+ replyTo = newReplyTo;
+ return this;
+ }
+
+ /**
+ * Sets the callback to be notified of failure.
+ *
+ * @param newOnFailureCallback the callback
+ * @return this Builder
+ */
+ public Builder onFailureCallback(final Consumer<Throwable> newOnFailureCallback) {
+ checkSealed();
+ onFailureCallback = newOnFailureCallback;
+ return this;
+ }
+
+ /**
+ * Builds a new SliceOptions instance.
+ *
+ * @return a new SliceOptions
+ */
+ public SliceOptions build() {
+ sealed = true;
+
+ Preconditions.checkNotNull(identifier, "identifier must be set");
+ Preconditions.checkNotNull(replyTo, "replyTo must be set");
+ Preconditions.checkNotNull(onFailureCallback, "onFailureCallback must be set");
+ Preconditions.checkState(fileBackedStream == null || message == null,
+ "Only one of message and fileBackedStream can be set");
+ Preconditions.checkState(!(fileBackedStream == null && message == null),
+ "One of message and fileBackedStream must be set");
+ Preconditions.checkState(sendToRef == null || sendToSelection == null,
+ "Only one of sendToRef and sendToSelection can be set");
+ Preconditions.checkState(!(sendToRef == null && sendToSelection == null),
+ "One of sendToRef and sendToSelection must be set");
+
+ return new SliceOptions(this);
+ }
+
+ protected void checkSealed() {
+ Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintains the state of a sliced message.
+ *
+ * @author Thomas Pantelis
+ * @see MessageSlicer
+ */
+@NotThreadSafe
+public class SlicedMessageState<T> implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(SlicedMessageState.class);
+
+ // The index of the first slice that is sent.
+ static final int FIRST_SLICE_INDEX = 1;
+
+ // The initial hash code for a slice.
+ static final int INITIAL_SLICE_HASH_CODE = -1;
+
+ private final Identifier identifier;
+ private final int messageSliceSize;
+ private final FileBackedOutputStream fileBackedStream;
+ private final T replyTarget;
+ private final ByteSource messageBytes;
+ private final int totalSlices;
+ private final long totalMessageSize;
+ private final int maxRetries;
+ private final Consumer<Throwable> onFailureCallback;
+ private final String logContext;
+
+ private int currentByteOffset = 0;
+ private int currentSliceIndex = FIRST_SLICE_INDEX - 1;
+ private int lastSliceHashCode = INITIAL_SLICE_HASH_CODE;
+ private int currentSliceHashCode = INITIAL_SLICE_HASH_CODE;
+ private int tryCount = 1;
+ private InputStream messageInputStream;
+
+ /**
+ * Constructor.
+ *
+ * @param identifier the identifier for this instance
+ * @param fileBackedStream the FileBackedOutputStream containing the serialized data to slice
+ * @param messageSliceSize the maximum size (in bytes) for a message slice
+ * @param maxRetries the maximum number of retries
+ * @param replyTarget the user-defined target for sliced message replies
+ * @param onFailureCallback the callback to notify on failure
+ * @param logContext the context for log messages
+ * @throws IOException if an error occurs opening the input stream
+ */
+ public SlicedMessageState(final Identifier identifier, final FileBackedOutputStream fileBackedStream,
+ final int messageSliceSize, final int maxRetries, final T replyTarget,
+ final Consumer<Throwable> onFailureCallback, final String logContext) throws IOException {
+ this.identifier = identifier;
+ this.fileBackedStream = fileBackedStream;
+ this.messageSliceSize = messageSliceSize;
+ this.maxRetries = maxRetries;
+ this.replyTarget = replyTarget;
+ this.onFailureCallback = onFailureCallback;
+ this.logContext = logContext;
+
+ messageBytes = fileBackedStream.asByteSource();
+ totalMessageSize = messageBytes.size();
+ messageInputStream = messageBytes.openStream();
+
+ totalSlices = (int)(totalMessageSize / messageSliceSize + (totalMessageSize % messageSliceSize > 0 ? 1 : 0));
+
+ LOG.debug("{}: Message size: {} bytes, total slices to send: {}", logContext, totalMessageSize, totalSlices);
+ }
+
+ /**
+ * Returns the current slice index that has been sent.
+ *
+ * @return the current slice index that has been sent
+ */
+ public int getCurrentSliceIndex() {
+ return currentSliceIndex;
+ }
+
+ /**
+ * Returns the hash code of the last slice that was sent.
+ *
+ * @return the hash code of the last slice that was sent
+ */
+ public int getLastSliceHashCode() {
+ return lastSliceHashCode;
+ }
+
+ /**
+ * Returns the total number of slices to send.
+ *
+ * @return the total number of slices to send
+ */
+ public int getTotalSlices() {
+ return totalSlices;
+ }
+
+ /**
+ * Returns the identifier of this instance.
+ *
+ * @return the identifier
+ */
+ public Identifier getIdentifier() {
+ return identifier;
+ }
+
+ /**
+ * Returns the user-defined target for sliced message replies.
+ *
+ * @return the user-defined target
+ */
+ public T getReplyTarget() {
+ return replyTarget;
+ }
+
+ /**
+ * Returns the callback to notify on failure.
+ *
+ * @return the callback to notify on failure
+ */
+ public Consumer<Throwable> getOnFailureCallback() {
+ return onFailureCallback;
+ }
+
+ /**
+ * Determines if the slicing can be retried.
+ *
+ * @return true if the slicing can be retried, false if the maximum number of retries has been reached
+ */
+ public boolean canRetry() {
+ return tryCount <= maxRetries;
+ }
+
+ /**
+ * Determines if the given index is the last slice to send.
+ *
+ * @param index the slice index to test
+ * @return true if the index is the last slice, false otherwise
+ */
+ public boolean isLastSlice(int index) {
+ return totalSlices == index;
+ }
+
+ /**
+ * Reads and returns the next slice of data.
+ *
+ * @return the next slice of data as a byte[]
+ * @throws IOException if an error occurs reading the data
+ */
+ public byte[] getNextSlice() throws IOException {
+ currentSliceIndex++;
+ final int start;
+ if (currentSliceIndex == FIRST_SLICE_INDEX) {
+ start = 0;
+ } else {
+ start = incrementByteOffset();
+ }
+
+ final int size;
+ if (messageSliceSize > totalMessageSize) {
+ size = (int) totalMessageSize;
+ } else if (start + messageSliceSize > totalMessageSize) {
+ size = (int) (totalMessageSize - start);
+ } else {
+ size = messageSliceSize;
+ }
+
+ LOG.debug("{}: getNextSlice: total size: {}, offset: {}, size: {}, index: {}", logContext, totalMessageSize,
+ start, size, currentSliceIndex);
+
+ byte[] nextSlice = new byte[size];
+ int numRead = messageInputStream.read(nextSlice);
+ if (numRead != size) {
+ throw new IOException(String.format(
+ "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size));
+ }
+
+ lastSliceHashCode = currentSliceHashCode;
+ currentSliceHashCode = Arrays.hashCode(nextSlice);
+
+ return nextSlice;
+ }
+
+ /**
+ * Resets this instance to restart slicing from the beginning.
+ *
+ * @throws IOException if an error occurs resetting the input stream
+ */
+ public void reset() throws IOException {
+ closeStream();
+
+ tryCount++;
+ currentByteOffset = 0;
+ currentSliceIndex = FIRST_SLICE_INDEX - 1;
+ lastSliceHashCode = INITIAL_SLICE_HASH_CODE;
+ currentSliceHashCode = INITIAL_SLICE_HASH_CODE;
+
+ messageInputStream = messageBytes.openStream();
+ }
+
+ private int incrementByteOffset() {
+ currentByteOffset += messageSliceSize;
+ return currentByteOffset;
+ }
+
+ private void closeStream() {
+ if (messageInputStream != null) {
+ try {
+ messageInputStream.close();
+ } catch (IOException e) {
+ LOG.warn("{}: Error closing message stream", logContext, e);
+ }
+
+ messageInputStream = null;
+ }
+ }
+
+ @Override
+ public void close() {
+ closeStream();
+ fileBackedStream.cleanup();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for AbortSlicing.
+ *
+ * @author Thomas Pantelis
+ */
+public class AbortSlicingTest {
+
+ @Test
+ public void testSerialization() {
+ AbortSlicing expected = new AbortSlicing(new StringIdentifier("test"));
+ AbortSlicing cloned = (AbortSlicing) SerializationUtils.clone(expected);
+ assertEquals("getIdentifier", expected.getIdentifier(), cloned.getIdentifier());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.InputStream;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+
+/**
+ * Abstract base class for messaging tests.
+ *
+ * @author Thomas Pantelis
+ */
+public class AbstractMessagingTest {
+ protected static final StringIdentifier IDENTIFIER = new StringIdentifier("test");
+ protected static ActorSystem ACTOR_SYSTEM;
+
+ protected final TestProbe testProbe = TestProbe.apply(ACTOR_SYSTEM);
+
+ @Mock
+ protected FileBackedOutputStreamFactory mockFiledBackedStreamFactory;
+
+ @Mock
+ protected FileBackedOutputStream mockFiledBackedStream;
+
+ @Mock
+ protected ByteSource mockByteSource;
+
+ @Mock
+ protected InputStream mockInputStream;
+
+ @BeforeClass
+ public static void setupClass() throws IOException {
+ ACTOR_SYSTEM = ActorSystem.create("test");
+ }
+
+ @Before
+ public void setup() throws IOException {
+ MockitoAnnotations.initMocks(this);
+
+ doReturn(mockFiledBackedStream).when(mockFiledBackedStreamFactory).newInstance();
+ doNothing().when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt());
+ doNothing().when(mockFiledBackedStream).write(any(byte[].class));
+ doNothing().when(mockFiledBackedStream).write(anyInt());
+ doNothing().when(mockFiledBackedStream).close();
+ doNothing().when(mockFiledBackedStream).cleanup();
+ doNothing().when(mockFiledBackedStream).flush();
+ doReturn(mockByteSource).when(mockFiledBackedStream).asByteSource();
+
+ doReturn(mockInputStream).when(mockByteSource).openStream();
+ doReturn(mockInputStream).when(mockByteSource).openBufferedStream();
+ doReturn(10L).when(mockByteSource).size();
+
+ doReturn(0).when(mockInputStream).read(any(byte[].class));
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ JavaTestKit.shutdownActorSystem(ACTOR_SYSTEM, Boolean.TRUE);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * Serializable message that stores a byte[].
+ *
+ * @author Thomas Pantelis
+ */
+public class BytesMessage implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final byte[] bytes;
+
+ public BytesMessage(byte[] bytes) {
+ this.bytes = Arrays.copyOf(bytes, bytes.length);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(bytes);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ BytesMessage other = (BytesMessage) obj;
+ return Arrays.equals(bytes, other.bytes);
+ }
+
+ @Override
+ public String toString() {
+ return "BytesMessage [bytes=" + Arrays.toString(bytes) + "]";
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.messaging.MessageSlicingIntegrationTest.assertAssembledMessage;
+import static org.opendaylight.controller.cluster.messaging.MessageSlicingIntegrationTest.assertFailedMessageSliceReply;
+import static org.opendaylight.controller.cluster.messaging.MessageSlicingIntegrationTest.assertSuccessfulMessageSliceReply;
+
+import akka.actor.ActorRef;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler.Builder;
+
+/**
+ * Unit tests for MessageAssembler.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageAssemblerTest extends AbstractMessagingTest {
+
+ @Mock
+ private BiConsumer<Object, ActorRef> mockAssembledMessageCallback;
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ super.setup();
+
+ doNothing().when(mockAssembledMessageCallback).accept(any(Object.class), any(ActorRef.class));
+ }
+
+ @Test
+ public void testHandledMessages() {
+ final MessageSlice messageSlice = new MessageSlice(IDENTIFIER, new byte[0], 1, 1, 1, testProbe.ref());
+ final AbortSlicing abortSlicing = new AbortSlicing(IDENTIFIER);
+ assertEquals("isHandledMessage", Boolean.TRUE, MessageAssembler.isHandledMessage(messageSlice));
+ assertEquals("isHandledMessage", Boolean.TRUE, MessageAssembler.isHandledMessage(abortSlicing));
+ assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
+
+ try (MessageAssembler assembler = newMessageAssembler("testHandledMessages")) {
+ assertEquals("handledMessage", Boolean.TRUE, assembler.handleMessage(messageSlice, testProbe.ref()));
+ assertEquals("handledMessage", Boolean.TRUE, assembler.handleMessage(abortSlicing, testProbe.ref()));
+ assertEquals("handledMessage", Boolean.FALSE, assembler.handleMessage(new Object(), testProbe.ref()));
+ }
+ }
+
+ @Test
+ public void testSingleMessageSlice() {
+ try (MessageAssembler assembler = newMessageAssembler("testSingleMessageSlice")) {
+ final FileBackedOutputStream fileBackStream = spy(new FileBackedOutputStream(100000000, null));
+ doReturn(fileBackStream).when(mockFiledBackedStreamFactory).newInstance();
+
+ final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+ final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+
+ final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
+ SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref());
+ assembler.handleMessage(messageSlice, testProbe.ref());
+
+ final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
+ assertSuccessfulMessageSliceReply(reply, IDENTIFIER, 1);
+
+ assertAssembledMessage(mockAssembledMessageCallback, message, testProbe.ref());
+
+ assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier));
+ verify(fileBackStream).cleanup();
+ }
+ }
+
+ @Test
+ public void testMessageSliceWithByteSourceFailure() throws IOException {
+ try (MessageAssembler assembler = newMessageAssembler("testMessageSliceWithByteSourceFailure")) {
+ IOException mockFailure = new IOException("mock IOException");
+ doThrow(mockFailure).when(mockByteSource).openStream();
+ doThrow(mockFailure).when(mockByteSource).openBufferedStream();
+
+ final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+ final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+
+ final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
+ SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref());
+ assembler.handleMessage(messageSlice, testProbe.ref());
+
+ final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
+ assertFailedMessageSliceReply(reply, IDENTIFIER, false);
+ assertEquals("Failure cause", mockFailure, reply.getFailure().get().getCause());
+
+ assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier));
+ verify(mockFiledBackedStream).cleanup();
+ }
+ }
+
+ @Test
+ public void testMessageSliceWithStreamWriteFailure() throws IOException {
+ try (MessageAssembler assembler = newMessageAssembler("testMessageSliceWithStreamWriteFailure")) {
+ IOException mockFailure = new IOException("mock IOException");
+ doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt());
+ doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class));
+ doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt());
+ doThrow(mockFailure).when(mockFiledBackedStream).flush();
+
+ final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+ final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+
+ final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
+ SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref());
+ assembler.handleMessage(messageSlice, testProbe.ref());
+
+ final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
+ assertFailedMessageSliceReply(reply, IDENTIFIER, false);
+ assertEquals("Failure cause", mockFailure, reply.getFailure().get().getCause());
+
+ assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier));
+ verify(mockFiledBackedStream).cleanup();
+ }
+ }
+
+ @Test
+ public void testAssembledMessageStateExpiration() throws IOException {
+ final int expiryDuration = 200;
+ try (MessageAssembler assembler = newMessageAssemblerBuilder("testAssembledMessageStateExpiration")
+ .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
+ final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+ final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+
+ final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 2,
+ SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref());
+ assembler.handleMessage(messageSlice, testProbe.ref());
+
+ final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
+ assertSuccessfulMessageSliceReply(reply, IDENTIFIER, 1);
+
+ assertTrue("MessageAssembler should have remove state for " + identifier, assembler.hasState(identifier));
+ Uninterruptibles.sleepUninterruptibly(expiryDuration + 50, TimeUnit.MILLISECONDS);
+ assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier));
+
+ verify(mockFiledBackedStream).cleanup();
+ }
+ }
+
+ @Test
+ public void testFirstMessageSliceWithInvalidIndex() {
+ try (MessageAssembler assembler = newMessageAssembler("testFirstMessageSliceWithInvalidIndex")) {
+ final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+ final MessageSlice messageSlice = new MessageSlice(identifier, new byte[0], 2, 3, 1, testProbe.ref());
+ assembler.handleMessage(messageSlice, testProbe.ref());
+
+ final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
+ assertFailedMessageSliceReply(reply, IDENTIFIER, true);
+ assertFalse("MessageAssembler should not have state for " + identifier, assembler.hasState(identifier));
+ }
+ }
+
+ private MessageAssembler newMessageAssembler(String logContext) {
+ return newMessageAssemblerBuilder(logContext).build();
+ }
+
+ private Builder newMessageAssemblerBuilder(String logContext) {
+ return MessageAssembler.builder().filedBackedStreamFactory(mockFiledBackedStreamFactory)
+ .assembledMessageCallback(mockAssembledMessageCallback).logContext(logContext);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+
+/**
+ * Unit tests for MessageSliceIdentifier.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSliceIdentifierTest {
+
+ @Test
+ public void testSerialization() {
+ MessageSliceIdentifier expected = new MessageSliceIdentifier(new StringIdentifier("test"));
+ MessageSliceIdentifier cloned = (MessageSliceIdentifier) SerializationUtils.clone(expected);
+ assertEquals("cloned", expected, cloned);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import static org.junit.Assert.assertEquals;
+
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.serialization.JavaSerializer;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit tests for MessageSliceReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSliceReplyTest {
+ private final ActorSystem actorSystem = ActorSystem.create("test");
+
+ @Before
+ public void setUp() {
+ JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) actorSystem);
+ }
+
+ @After
+ public void tearDown() {
+ JavaTestKit.shutdownActorSystem(actorSystem, Boolean.TRUE);
+ }
+
+ @Test
+ public void testSerialization() {
+ testSuccess();
+ testFailure();
+ }
+
+ private void testSuccess() {
+ MessageSliceReply expected = MessageSliceReply.success(new StringIdentifier("test"), 3,
+ TestProbe.apply(actorSystem).ref());
+ MessageSliceReply cloned = (MessageSliceReply) SerializationUtils.clone(expected);
+
+ assertEquals("getIdentifier", expected.getIdentifier(), cloned.getIdentifier());
+ assertEquals("getSliceIndex", expected.getSliceIndex(), cloned.getSliceIndex());
+ assertEquals("getSendTo", expected.getSendTo(), cloned.getSendTo());
+ assertEquals("getFailure present", Boolean.FALSE, cloned.getFailure().isPresent());
+ }
+
+ private void testFailure() {
+ MessageSliceReply expected = MessageSliceReply.failed(new StringIdentifier("test"),
+ new MessageSliceException("mock", true), TestProbe.apply(actorSystem).ref());
+ MessageSliceReply cloned = (MessageSliceReply) SerializationUtils.clone(expected);
+
+ assertEquals("getIdentifier", expected.getIdentifier(), cloned.getIdentifier());
+ assertEquals("getSliceIndex", expected.getSliceIndex(), cloned.getSliceIndex());
+ assertEquals("getSendTo", expected.getSendTo(), cloned.getSendTo());
+ assertEquals("getFailure present", Boolean.TRUE, cloned.getFailure().isPresent());
+ assertEquals("getFailure message", expected.getFailure().get().getMessage(),
+ cloned.getFailure().get().getMessage());
+ assertEquals("getFailure isRetriable", expected.getFailure().get().isRetriable(),
+ cloned.getFailure().get().isRetriable());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.serialization.JavaSerializer;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit tests for MessageSlice.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSliceTest {
+ private final ActorSystem actorSystem = ActorSystem.create("test");
+
+ @Before
+ public void setUp() {
+ JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) actorSystem);
+ }
+
+ @After
+ public void tearDown() {
+ JavaTestKit.shutdownActorSystem(actorSystem, Boolean.TRUE);
+ }
+
+ @Test
+ public void testSerialization() {
+ byte[] data = new byte[1000];
+ for (int i = 0, j = 0; i < data.length; i++) {
+ data[i] = (byte)j;
+ if (++j >= 255) {
+ j = 0;
+ }
+ }
+
+ MessageSlice expected = new MessageSlice(new StringIdentifier("test"), data, 2, 3, 54321,
+ TestProbe.apply(actorSystem).ref());
+ MessageSlice cloned = (MessageSlice) SerializationUtils.clone(expected);
+
+ assertEquals("getIdentifier", expected.getIdentifier(), cloned.getIdentifier());
+ assertEquals("getSliceIndex", expected.getSliceIndex(), cloned.getSliceIndex());
+ assertEquals("getTotalSlices", expected.getTotalSlices(), cloned.getTotalSlices());
+ assertEquals("getLastSliceHashCode", expected.getLastSliceHashCode(), cloned.getLastSliceHashCode());
+ assertArrayEquals("getData", expected.getData(), cloned.getData());
+ assertEquals("getReplyTo", expected.getReplyTo(), cloned.getReplyTo());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import akka.actor.ActorRef;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * Unit tests for MessageSlicer.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSlicerTest extends AbstractMessagingTest {
+ @Mock
+ private Consumer<Throwable> mockOnFailureCallback;
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ super.setup();
+
+ doNothing().when(mockOnFailureCallback).accept(any(Throwable.class));
+ }
+
+ @Test
+ public void testHandledMessages() {
+ final MessageSliceReply reply = MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref());
+ assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
+ assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
+
+ try (MessageSlicer slicer = newMessageSlicer("testHandledMessages", 100)) {
+ assertEquals("handledMessage", Boolean.TRUE, slicer.handleMessage(reply));
+ assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(new Object()));
+ }
+ }
+
+ @Test
+ public void testSliceWithFailedSerialization() throws IOException {
+ IOException mockFailure = new IOException("mock IOException");
+ doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt());
+ doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class));
+ doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt());
+ doThrow(mockFailure).when(mockFiledBackedStream).flush();
+
+ try (MessageSlicer slicer = newMessageSlicer("testSliceWithFailedSerialization", 100)) {
+ slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
+ mockOnFailureCallback);
+
+ assertFailureCallback(IOException.class);
+ verify(mockFiledBackedStream).cleanup();
+ }
+ }
+
+ @Test
+ public void testSliceWithByteSourceFailure() throws IOException {
+ IOException mockFailure = new IOException("mock IOException");
+ doThrow(mockFailure).when(mockByteSource).openStream();
+ doThrow(mockFailure).when(mockByteSource).openBufferedStream();
+
+ try (MessageSlicer slicer = newMessageSlicer("testSliceWithByteSourceFailure", 100)) {
+ slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
+ mockOnFailureCallback);
+
+ assertFailureCallback(IOException.class);
+ verify(mockFiledBackedStream).cleanup();
+ }
+ }
+
+ @Test
+ public void testSliceWithInputStreamFailure() throws IOException {
+ doReturn(0).when(mockInputStream).read(any(byte[].class));
+
+ try (MessageSlicer slicer = newMessageSlicer("testSliceWithInputStreamFailure", 2)) {
+ slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
+ mockOnFailureCallback);
+
+ assertFailureCallback(IOException.class);
+ verify(mockFiledBackedStream).cleanup();
+ }
+ }
+
+ @Test
+ public void testMessageSliceReplyWithNoState() {
+ try (MessageSlicer slicer = newMessageSlicer("testMessageSliceReplyWithNoState", 1000)) {
+ slicer.handleMessage(MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref()));
+ final AbortSlicing abortSlicing = testProbe.expectMsgClass(AbortSlicing.class);
+ assertEquals("Identifier", IDENTIFIER, abortSlicing.getIdentifier());
+ }
+ }
+
+ @Test
+ public void testCloseAllSlicedMessageState() throws IOException {
+ doReturn(1).when(mockInputStream).read(any(byte[].class));
+
+ final MessageSlicer slicer = newMessageSlicer("testCloseAllSlicedMessageState", 1);
+ slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(),
+ mockOnFailureCallback);
+
+ slicer.close();
+
+ verify(mockFiledBackedStream).cleanup();
+ verifyNoMoreInteractions(mockOnFailureCallback);
+ }
+
+ @Test
+ public void testCheckExpiredSlicedMessageState() throws IOException {
+ doReturn(1).when(mockInputStream).read(any(byte[].class));
+
+ final int expiryDuration = 200;
+ try (MessageSlicer slicer = MessageSlicer.builder().messageSliceSize(1)
+ .logContext("testCheckExpiredSlicedMessageState")
+ .filedBackedStreamFactory(mockFiledBackedStreamFactory)
+ .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
+ slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(),
+ mockOnFailureCallback);
+
+ Uninterruptibles.sleepUninterruptibly(expiryDuration + 50, TimeUnit.MILLISECONDS);
+ slicer.checkExpiredSlicedMessageState();
+
+ assertFailureCallback(RuntimeException.class);
+ verify(mockFiledBackedStream).cleanup();
+ }
+ }
+
+ private void assertFailureCallback(final Class<?> exceptionType) {
+ ArgumentCaptor<Throwable> exceptionCaptor = ArgumentCaptor.forClass(Throwable.class);
+ verify(mockOnFailureCallback).accept(exceptionCaptor.capture());
+ assertEquals("Exception type", exceptionType, exceptionCaptor.getValue().getClass());
+ }
+
+ private MessageSlicer newMessageSlicer(String logContext, final int messageSliceSize) {
+ return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext)
+ .filedBackedStreamFactory(mockFiledBackedStreamFactory).build();
+ }
+
+ static void slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo,
+ ActorRef replyTo, Consumer<Throwable> onFailureCallback) {
+ slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo).replyTo(replyTo)
+ .onFailureCallback(onFailureCallback).build());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.messaging.MessageSlicerTest.slice;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * End-to-end integration tests for message slicing.
+ *
+ * @author Thomas Pantelis
+ */
+public class MessageSlicingIntegrationTest {
+ private static final Logger LOG = LoggerFactory.getLogger(MessageSlicingIntegrationTest.class);
+
+ private static final ActorSystem ACTOR_SYSTEM = ActorSystem.create("test");
+ private static final FileBackedOutputStreamFactory FILE_BACKED_STREAM_FACTORY =
+ new FileBackedOutputStreamFactory(1000000000, "target");
+ private static final Identifier IDENTIFIER = new StringIdentifier("stringId");
+ private static final int DONT_CARE = -1;
+
+ private final TestProbe sendToProbe = TestProbe.apply(ACTOR_SYSTEM);
+ private final TestProbe replyToProbe = TestProbe.apply(ACTOR_SYSTEM);
+
+ @SuppressWarnings("unchecked")
+ private final Consumer<Throwable> mockOnFailureCallback = mock(Consumer.class);
+
+ @SuppressWarnings("unchecked")
+ private final BiConsumer<Object, ActorRef> mockAssembledMessageCallback = mock(BiConsumer.class);
+
+ private final MessageAssembler assembler = MessageAssembler.builder()
+ .assembledMessageCallback(mockAssembledMessageCallback).logContext("test")
+ .filedBackedStreamFactory(FILE_BACKED_STREAM_FACTORY).build();
+
+ @Before
+ public void setup() {
+ doNothing().when(mockOnFailureCallback).accept(any(Throwable.class));
+ doNothing().when(mockAssembledMessageCallback).accept(any(Object.class), any(ActorRef.class));
+ }
+
+ @After
+ public void tearDown() {
+ assembler.close();
+ }
+
+ @AfterClass
+ public static void staticTearDown() {
+ JavaTestKit.shutdownActorSystem(ACTOR_SYSTEM, Boolean.TRUE);
+ }
+
+ @Test
+ public void testSlicingWithChunks() throws IOException {
+ LOG.info("testSlicingWithChunks starting");
+
+ // First slice a message where the messageSliceSize divides evenly into the serialized size.
+
+ byte[] emptyMessageBytes = SerializationUtils.serialize(new BytesMessage(new byte[]{}));
+ int messageSliceSize = 10;
+ int expTotalSlices = emptyMessageBytes.length / messageSliceSize;
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ if (emptyMessageBytes.length % messageSliceSize > 0) {
+ expTotalSlices++;
+ int padding = messageSliceSize - emptyMessageBytes.length % messageSliceSize;
+ byte value = 1;
+ for (int i = 0; i < padding; i++, value++) {
+ byteStream.write(value);
+ }
+ }
+
+ testSlicing("testSlicingWithChunks", messageSliceSize, expTotalSlices, byteStream.toByteArray());
+
+ // Now slice a message where the messageSliceSize doesn't divide evenly.
+
+ byteStream.write(new byte[]{100, 101, 102});
+ testSlicing("testSlicingWithChunks", messageSliceSize, expTotalSlices + 1, byteStream.toByteArray());
+
+ LOG.info("testSlicingWithChunks ending");
+ }
+
+ @Test
+ public void testSingleSlice() {
+ LOG.info("testSingleSlice starting");
+
+ // Slice a message where the serialized size is equal to the messageSliceSize. In this case it should
+ // just send the original message.
+
+ final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+ try (MessageSlicer slicer = newMessageSlicer("testSingleSlice", SerializationUtils.serialize(message).length)) {
+ slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
+
+ final BytesMessage sentMessage = sendToProbe.expectMsgClass(BytesMessage.class);
+ assertEquals("Sent message", message, sentMessage);
+ }
+
+ LOG.info("testSingleSlice ending");
+ }
+
+ @Test
+ public void testSlicingWithRetry() {
+ LOG.info("testSlicingWithRetry starting");
+
+ final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+ final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
+ try (MessageSlicer slicer = newMessageSlicer("testSlicingWithRetry", messageSliceSize)) {
+ slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
+
+ MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
+ assembler.handleMessage(sliceMessage, sendToProbe.ref());
+
+ // Swallow the reply and send the MessageSlice again - it should return a failed reply.
+ replyToProbe.expectMsgClass(MessageSliceReply.class);
+ assembler.handleMessage(sliceMessage, sendToProbe.ref());
+
+ final MessageSliceReply failedReply = replyToProbe.expectMsgClass(MessageSliceReply.class);
+ assertFailedMessageSliceReply(failedReply, IDENTIFIER, true);
+
+ // Send the failed reply - slicing should be retried from the beginning.
+
+ slicer.handleMessage(failedReply);
+ while (true) {
+ sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
+ assembler.handleMessage(sliceMessage, sendToProbe.ref());
+
+ final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
+ assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceMessage.getSliceIndex());
+ slicer.handleMessage(reply);
+
+ if (reply.getSliceIndex() == sliceMessage.getTotalSlices()) {
+ break;
+ }
+ }
+
+ assertAssembledMessage(message, replyToProbe.ref());
+ }
+
+ LOG.info("testSlicingWithRetry ending");
+ }
+
+ @Test
+ public void testSlicingWithMaxRetriesReached() {
+ LOG.info("testSlicingWithMaxRetriesReached starting");
+
+ final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+ final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
+ try (MessageSlicer slicer = newMessageSlicer("testSlicingWithMaxRetriesReached", messageSliceSize)) {
+ slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
+
+ Identifier slicingId = null;
+ for (int i = 0; i < MessageSlicer.DEFAULT_MAX_SLICING_TRIES; i++) {
+ MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
+ slicingId = sliceMessage.getIdentifier();
+ assertMessageSlice(sliceMessage, IDENTIFIER, 1, DONT_CARE, SlicedMessageState.INITIAL_SLICE_HASH_CODE,
+ replyToProbe.ref());
+ assembler.handleMessage(sliceMessage, sendToProbe.ref());
+
+ // Swallow the reply and send the MessageSlicer a reply with an invalid index.
+ final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
+ assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceMessage.getSliceIndex());
+ slicer.handleMessage(MessageSliceReply.success(reply.getIdentifier(), 100000, reply.getSendTo()));
+
+ final AbortSlicing abortSlicing = sendToProbe.expectMsgClass(AbortSlicing.class);
+ assertEquals("Identifier", slicingId, abortSlicing.getIdentifier());
+ assembler.handleMessage(abortSlicing, sendToProbe.ref());
+ }
+
+ slicer.handleMessage(MessageSliceReply.success(slicingId, 100000, sendToProbe.ref()));
+
+ assertFailureCallback(RuntimeException.class);
+
+ assertFalse("MessageSlicer did not remove state for " + slicingId, slicer.hasState(slicingId));
+ assertFalse("MessageAssembler did not remove state for " + slicingId, assembler.hasState(slicingId));
+ }
+
+ LOG.info("testSlicingWithMaxRetriesReached ending");
+ }
+
+ @Test
+ public void testSlicingWithFailure() {
+ LOG.info("testSlicingWithFailure starting");
+
+ final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+ final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
+ try (MessageSlicer slicer = newMessageSlicer("testSlicingWithFailure", messageSliceSize)) {
+ slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
+
+ MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
+
+ MessageSliceException failure = new MessageSliceException("mock failure",
+ new IOException("mock IOException"));
+ slicer.handleMessage(MessageSliceReply.failed(sliceMessage.getIdentifier(), failure, sendToProbe.ref()));
+
+ assertFailureCallback(IOException.class);
+
+ assertFalse("MessageSlicer did not remove state for " + sliceMessage.getIdentifier(),
+ slicer.hasState(sliceMessage.getIdentifier()));
+ }
+
+ LOG.info("testSlicingWithFailure ending");
+ }
+
+ @Test
+ public void testSliceWithFileBackedOutputStream() throws IOException {
+ LOG.info("testSliceWithFileBackedOutputStream starting");
+
+ final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
+ FileBackedOutputStream fileBackedStream = FILE_BACKED_STREAM_FACTORY.newInstance();
+ try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
+ out.writeObject(message);
+ }
+
+ try (MessageSlicer slicer = newMessageSlicer("testSliceWithFileBackedOutputStream",
+ SerializationUtils.serialize(message).length)) {
+ slicer.slice(SliceOptions.builder().identifier(IDENTIFIER).fileBackedOutputStream(fileBackedStream)
+ .sendTo(ACTOR_SYSTEM.actorSelection(sendToProbe.ref().path())).replyTo(replyToProbe.ref())
+ .onFailureCallback(mockOnFailureCallback).build());
+
+ final MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
+ assembler.handleMessage(sliceMessage, sendToProbe.ref());
+ assertAssembledMessage(message, replyToProbe.ref());
+ }
+
+ LOG.info("testSliceWithFileBackedOutputStream ending");
+ }
+
+ @SuppressWarnings("unchecked")
+ private void testSlicing(String logContext, int messageSliceSize, int expTotalSlices, byte[] messageData) {
+ reset(mockAssembledMessageCallback);
+
+ final BytesMessage message = new BytesMessage(messageData);
+
+ try (MessageSlicer slicer = newMessageSlicer(logContext, messageSliceSize)) {
+ slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
+
+ Identifier slicingId = null;
+ int expLastSliceHashCode = SlicedMessageState.INITIAL_SLICE_HASH_CODE;
+ for (int sliceIndex = 1; sliceIndex <= expTotalSlices; sliceIndex++) {
+ final MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
+ slicingId = sliceMessage.getIdentifier();
+ assertMessageSlice(sliceMessage, IDENTIFIER, sliceIndex, expTotalSlices, expLastSliceHashCode,
+ replyToProbe.ref());
+
+ assembler.handleMessage(sliceMessage, sendToProbe.ref());
+
+ final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
+ assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceIndex);
+
+ expLastSliceHashCode = Arrays.hashCode(sliceMessage.getData());
+
+ slicer.handleMessage(reply);
+ }
+
+ assertAssembledMessage(message, replyToProbe.ref());
+
+ assertFalse("MessageSlicer did not remove state for " + slicingId, slicer.hasState(slicingId));
+ assertFalse("MessageAssembler did not remove state for " + slicingId, assembler.hasState(slicingId));
+ }
+ }
+
+ private void assertFailureCallback(final Class<?> exceptionType) {
+ ArgumentCaptor<Throwable> exceptionCaptor = ArgumentCaptor.forClass(Throwable.class);
+ verify(mockOnFailureCallback).accept(exceptionCaptor.capture());
+ assertEquals("Exception type", exceptionType, exceptionCaptor.getValue().getClass());
+ }
+
+ private void assertAssembledMessage(final BytesMessage message, final ActorRef sender) {
+ assertAssembledMessage(mockAssembledMessageCallback, message, sender);
+ }
+
+ static void assertAssembledMessage(final BiConsumer<Object, ActorRef> mockAssembledMessageCallback,
+ final BytesMessage message, final ActorRef sender) {
+ ArgumentCaptor<Object> assembledMessageCaptor = ArgumentCaptor.forClass(Object.class);
+ ArgumentCaptor<ActorRef> senderActorRefCaptor = ArgumentCaptor.forClass(ActorRef.class);
+ verify(mockAssembledMessageCallback).accept(assembledMessageCaptor.capture(), senderActorRefCaptor.capture());
+ assertEquals("Assembled message", message, assembledMessageCaptor.getValue());
+ assertEquals("Sender ActorRef", sender, senderActorRefCaptor.getValue());
+ }
+
+ static void assertSuccessfulMessageSliceReply(MessageSliceReply reply, Identifier identifier, int sliceIndex) {
+ assertEquals("Identifier", identifier, ((MessageSliceIdentifier)reply.getIdentifier())
+ .getClientIdentifier());
+ assertEquals("SliceIndex", sliceIndex, reply.getSliceIndex());
+ }
+
+ static void assertFailedMessageSliceReply(MessageSliceReply reply, Identifier identifier, boolean isRetriable) {
+ assertEquals("Identifier", identifier, ((MessageSliceIdentifier)reply.getIdentifier())
+ .getClientIdentifier());
+ assertEquals("Failure present", Boolean.TRUE, reply.getFailure().isPresent());
+ assertEquals("isRetriable", isRetriable, reply.getFailure().get().isRetriable());
+ }
+
+ static void assertMessageSlice(MessageSlice sliceMessage, Identifier identifier, int sliceIndex, int totalSlices,
+ int lastSliceHashCode, ActorRef replyTo) {
+ assertEquals("Identifier", identifier, ((MessageSliceIdentifier)sliceMessage.getIdentifier())
+ .getClientIdentifier());
+ assertEquals("SliceIndex", sliceIndex, sliceMessage.getSliceIndex());
+ assertEquals("LastSliceHashCode", lastSliceHashCode, sliceMessage.getLastSliceHashCode());
+ assertEquals("ReplyTo", replyTo, sliceMessage.getReplyTo());
+
+ if (totalSlices != DONT_CARE) {
+ assertEquals("TotalSlices", totalSlices, sliceMessage.getTotalSlices());
+ }
+ }
+
+ private MessageSlicer newMessageSlicer(String logContext, final int messageSliceSize) {
+ return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext)
+ .filedBackedStreamFactory(FILE_BACKED_STREAM_FACTORY).build();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.messaging;
+
+import org.opendaylight.yangtools.util.AbstractStringIdentifier;
+
+/**
+ * Identifier that stores a string.
+ *
+ * @author Thomas Pantelis
+ */
+public class StringIdentifier extends AbstractStringIdentifier<StringIdentifier> {
+ private static final long serialVersionUID = 1L;
+
+ public StringIdentifier(String string) {
+ super(string);
+ }
+}