From 3582bb6dbc506b0c79dd3e4b4f791f4e17cd3103 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Thu, 20 Apr 2017 09:10:05 -0400 Subject: [PATCH] Bug 7449: Add message slicing/re-assembly classes Added re-usable classes for slicing messages into smaller chunks and re-assembling them. The process is similar to the current raft install snapshot chunking. The 2 main classes are MessageSlicer and MessageAssembler. The basic workflow is: - MessageSlicer#slice method is called which serializes the message, creates and caches a SlicedMessageState instance and determines the number of slices based on the serialized size and the maximum message slice size, then sends the first MessageSlice message. - The MessageAssembler on the other end receives the MessageSlice, creates an AssembledMessageState instance if necessary and appends the byte[] chunk to the assembled stream. A MessageSliceReply is returned. - The MessageSlicer receives the MessageSliceReply and sends the next MessageSlice chunk, if necessary. - Once the last MessageSlice chunk is received by the MessageAssembler, it re-assembles the original message by de-serializing the assembled stream and notifies the user-supplied callback (of type Consumer) to handle the message. Both MessageSlicer and MessageAssembler use a guava Cache and can be configured to evict state that has been inactive for a period of time, ie if a message hasn't been received by the other end. The MessageSliceReply can propagate a MessageSliceException. If the MessageSliceException indicates it's re-triable, the MessageSlicer will restart slicing from the beginning. Otherwise slicing is aborted and the user-supplied failure callback is notified. Change-Id: Iceea212b12f49c3944bade50afded92244e4b31a Signed-off-by: Tom Pantelis --- .../io/FileBackedOutputStreamFactory.java | 42 ++ .../cluster/messaging/AbortSlicing.java | 74 ++++ .../messaging/AssembledMessageState.java | 171 ++++++++ .../messaging/AssemblerClosedException.java | 28 ++ .../messaging/AssemblerSealedException.java | 26 ++ .../cluster/messaging/MessageAssembler.java | 286 ++++++++++++++ .../cluster/messaging/MessageSlice.java | 127 ++++++ .../messaging/MessageSliceException.java | 50 +++ .../messaging/MessageSliceIdentifier.java | 105 +++++ .../cluster/messaging/MessageSliceReply.java | 116 ++++++ .../cluster/messaging/MessageSlicer.java | 374 ++++++++++++++++++ .../cluster/messaging/SliceOptions.java | 196 +++++++++ .../cluster/messaging/SlicedMessageState.java | 239 +++++++++++ .../cluster/messaging/AbortSlicingTest.java | 28 ++ .../messaging/AbstractMessagingTest.java | 81 ++++ .../cluster/messaging/BytesMessage.java | 51 +++ .../messaging/MessageAssemblerTest.java | 184 +++++++++ .../messaging/MessageSliceIdentifierTest.java | 28 ++ .../messaging/MessageSliceReplyTest.java | 71 ++++ .../cluster/messaging/MessageSliceTest.java | 62 +++ .../cluster/messaging/MessageSlicerTest.java | 164 ++++++++ .../MessageSlicingIntegrationTest.java | 341 ++++++++++++++++ .../cluster/messaging/StringIdentifier.java | 23 ++ 23 files changed, 2867 insertions(+) create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamFactory.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AbortSlicing.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssembledMessageState.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssemblerClosedException.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssemblerSealedException.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageAssembler.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlice.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceException.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifier.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceReply.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/SliceOptions.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/SlicedMessageState.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbortSlicingTest.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbstractMessagingTest.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/BytesMessage.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageAssemblerTest.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifierTest.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceReplyTest.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceTest.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicingIntegrationTest.java create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/StringIdentifier.java diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamFactory.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamFactory.java new file mode 100644 index 0000000000..0cd4be67a5 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/FileBackedOutputStreamFactory.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.io; + +import javax.annotation.Nullable; + +/** + * A factory for creating {@link FileBackedOutputStream} instances. + * + * @author Thomas Pantelis + * @see FileBackedOutputStream + */ +public class FileBackedOutputStreamFactory { + private final int fileThreshold; + private final String fileDirectory; + + /** + * Constructor. + * + * @param fileThreshold the number of bytes before streams should switch to buffering to a file + * @param fileDirectory the directory in which to create files if needed. If null, the default temp file + * location is used. + */ + public FileBackedOutputStreamFactory(final int fileThreshold, final @Nullable String fileDirectory) { + this.fileThreshold = fileThreshold; + this.fileDirectory = fileDirectory; + } + + /** + * Creates a new {@link FileBackedOutputStream} with the settings configured for this factory. + * + * @return a {@link FileBackedOutputStream} instance + */ + public FileBackedOutputStream newInstance() { + return new FileBackedOutputStream(fileThreshold, fileDirectory); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AbortSlicing.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AbortSlicing.java new file mode 100644 index 0000000000..b136ed7086 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AbortSlicing.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import com.google.common.base.Preconditions; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import org.opendaylight.yangtools.concepts.Identifier; + +/** + * Message sent to abort slicing. + * + * @author Thomas Pantelis + */ +class AbortSlicing implements Serializable { + private static final long serialVersionUID = 1L; + + private final Identifier identifier; + + AbortSlicing(final Identifier identifier) { + this.identifier = Preconditions.checkNotNull(identifier); + } + + Identifier getIdentifier() { + return identifier; + } + + @Override + public String toString() { + return "AbortSlicing [identifier=" + identifier + "]"; + } + + private Object writeReplace() { + return new Proxy(this); + } + + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private AbortSlicing abortSlicing; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + } + + Proxy(AbortSlicing abortSlicing) { + this.abortSlicing = abortSlicing; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(abortSlicing.identifier); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + abortSlicing = new AbortSlicing((Identifier) in.readObject()); + } + + private Object readResolve() { + return abortSlicing; + } + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssembledMessageState.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssembledMessageState.java new file mode 100644 index 0000000000..16c73c7155 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssembledMessageState.java @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import com.google.common.base.Preconditions; +import com.google.common.io.ByteSource; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.util.Arrays; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.io.FileBackedOutputStream; +import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; +import org.opendaylight.yangtools.concepts.Identifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintains the state of an assembled message. + * + * @author Thomas Pantelis + */ +@NotThreadSafe +public class AssembledMessageState implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(AssembledMessageState.class); + + private final int totalSlices; + private final BufferedOutputStream bufferedStream; + private final FileBackedOutputStream fileBackedStream; + private final Identifier identifier; + private final String logContext; + + private int lastSliceIndexReceived = SlicedMessageState.FIRST_SLICE_INDEX - 1; + private int lastSliceHashCodeReceived = SlicedMessageState.INITIAL_SLICE_HASH_CODE; + private boolean sealed = false; + private boolean closed = false; + private long assembledSize; + + /** + * Constructor. + * + * @param identifier the identifier for this instance + * @param totalSlices the total number of slices to expect + * @param fileBackedStreamFactory factory for creating the FileBackedOutputStream instance used for streaming + * @param logContext the context for log messages + */ + public AssembledMessageState(final Identifier identifier, final int totalSlices, + final FileBackedOutputStreamFactory fileBackedStreamFactory, final String logContext) { + this.identifier = identifier; + this.totalSlices = totalSlices; + this.logContext = logContext; + + fileBackedStream = fileBackedStreamFactory.newInstance(); + bufferedStream = new BufferedOutputStream(fileBackedStream); + } + + /** + * Returns the identifier of this instance. + * + * @return the identifier + */ + public Identifier getIdentifier() { + return identifier; + } + + /** + * Adds a slice to the assembled stream. + * + * @param sliceIndex the index of the slice + * @param data the sliced data + * @param lastSliceHashCode the hash code of the last slice sent + * @return true if this is the last slice received, false otherwise + * @throws MessageSliceException + *
    + *
  • if the slice index is invalid
  • + *
  • if the last slice hash code is invalid
  • + *
  • if an error occurs writing the data to the stream
  • + *
+ * In addition, this instance is automatically closed and can no longer be used. + * @throws AssemblerSealedException if this instance is already sealed (ie has received all the slices) + * @throws AssemblerClosedException if this instance is already closed + */ + public boolean addSlice(final int sliceIndex, final byte[] data, final int lastSliceHashCode) + throws MessageSliceException { + if (LOG.isDebugEnabled()) { + LOG.debug("{}: addSlice: identifier: {}, sliceIndex: {}, lastSliceIndex: {}, assembledSize: {}, " + + "sliceHashCode: {}, lastSliceHashCode: {}", logContext, identifier, sliceIndex, + lastSliceIndexReceived, assembledSize, lastSliceHashCode, lastSliceHashCodeReceived); + } + + try { + validateSlice(sliceIndex, lastSliceHashCode); + + assembledSize += data.length; + lastSliceIndexReceived = sliceIndex; + lastSliceHashCodeReceived = Arrays.hashCode(data); + + bufferedStream.write(data); + + sealed = sliceIndex == totalSlices; + if (sealed) { + bufferedStream.close(); + } + } catch (IOException e) { + close(); + throw new MessageSliceException(String.format("Error writing data for slice %d of message %s", + sliceIndex, identifier), e); + } + + return sealed; + } + + /** + * Returns the assembled bytes as a ByteSource. This method must only be called after this instance is sealed. + * + * @return a ByteSource containing the assembled bytes + * @throws IOException if an error occurs obtaining the assembled bytes + * @throws IllegalStateException is this instance is not sealed + */ + public ByteSource getAssembledBytes() throws IOException { + Preconditions.checkState(sealed, "Last slice not received yet"); + return fileBackedStream.asByteSource(); + } + + private void validateSlice(final int sliceIndex, final int lastSliceHashCode) throws MessageSliceException { + if (closed) { + throw new AssemblerClosedException(identifier); + } + + if (sealed) { + throw new AssemblerSealedException(String.format( + "Received slice index for message %s but all %d expected slices have already already received.", + identifier, totalSlices)); + } + + if (lastSliceIndexReceived + 1 != sliceIndex) { + close(); + throw new MessageSliceException(String.format("Expected sliceIndex %d but got %d for message %s", + lastSliceIndexReceived + 1, sliceIndex, identifier), true); + } + + if (lastSliceHashCode != lastSliceHashCodeReceived) { + close(); + throw new MessageSliceException(String.format("The hash code of the recorded last slice (%d) does not " + + "match the senders last hash code (%d) for message %s", lastSliceHashCodeReceived, + lastSliceHashCode, identifier), true); + } + } + + @Override + public void close() { + if (closed) { + return; + } + + closed = true; + if (!sealed) { + try { + bufferedStream.close(); + } catch (IOException e) { + LOG.debug("{}: Error closing output stream", logContext, e); + } + } + + fileBackedStream.cleanup(); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssemblerClosedException.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssemblerClosedException.java new file mode 100644 index 0000000000..83c8dcb987 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssemblerClosedException.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import org.opendaylight.yangtools.concepts.Identifier; + +/** + * A MessageSliceException indicating the message assembler has already been closed. + * + * @author Thomas Pantelis + */ +public class AssemblerClosedException extends MessageSliceException { + private static final long serialVersionUID = 1L; + + /** + * Constructs an instance. + * + * @param identifier the identifier whose state was closed + */ + public AssemblerClosedException(final Identifier identifier) { + super(String.format("Message assembler for %s has already been closed", identifier), false); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssemblerSealedException.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssemblerSealedException.java new file mode 100644 index 0000000000..df9ac638f0 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/AssemblerSealedException.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +/** + * A MessageSliceException indicating the message assembler has already been sealed. + * + * @author Thomas Pantelis + */ +public class AssemblerSealedException extends MessageSliceException { + private static final long serialVersionUID = 1L; + + /** + * Constructs an instance. + * + * @param message he detail message + */ + public AssemblerSealedException(String message) { + super(message, false); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageAssembler.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageAssembler.java new file mode 100644 index 0000000000..71b07f4a7e --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageAssembler.java @@ -0,0 +1,286 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import akka.actor.ActorRef; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.io.ByteSource; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; +import org.opendaylight.yangtools.concepts.Identifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class re-assembles messages sliced into smaller chunks by {@link MessageSlicer}. + * + * @author Thomas Pantelis + * @see MessageSlicer + */ +public class MessageAssembler implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(MessageAssembler.class); + + private final Cache stateCache; + private final FileBackedOutputStreamFactory filedBackedStreamFactory; + private final BiConsumer assembledMessageCallback; + private final String logContext; + + private MessageAssembler(Builder builder) { + this.filedBackedStreamFactory = Preconditions.checkNotNull(builder.filedBackedStreamFactory, + "FiledBackedStreamFactory cannot be null"); + this.assembledMessageCallback = Preconditions.checkNotNull(builder.assembledMessageCallback, + "assembledMessageCallback cannot be null"); + this.logContext = builder.logContext; + + stateCache = CacheBuilder.newBuilder() + .expireAfterAccess(builder.expireStateAfterInactivityDuration, builder.expireStateAfterInactivityUnit) + .removalListener((RemovalListener) notification -> + stateRemoved(notification)).build(); + } + + /** + * Returns a new Builder for creating MessageAssembler instances. + * + * @return a Builder instance + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Checks if the given message is handled by this class. If so, it should be forwarded to the + * {@link #handleMessage(Object, ActorRef)} method + * + * @param message the message to check + * @return true if handled, false otherwise + */ + public static boolean isHandledMessage(Object message) { + return message instanceof MessageSlice || message instanceof AbortSlicing; + } + + @Override + public void close() { + LOG.debug("{}: Closing", logContext); + stateCache.invalidateAll(); + } + + /** + * Checks for and removes assembled message state that has expired due to inactivity from the slicing component + * on the other end. + */ + public void checkExpiredAssembledMessageState() { + if (stateCache.size() > 0) { + stateCache.cleanUp(); + } + } + + /** + * Invoked to handle message slices and other messages pertaining to this class. + * + * @param message the message + * @param sendTo the reference of the actor to which subsequent message slices should be sent + * @return true if the message was handled, false otherwise + */ + public boolean handleMessage(final Object message, final @Nonnull ActorRef sendTo) { + if (message instanceof MessageSlice) { + LOG.debug("{}: handleMessage: {}", logContext, message); + onMessageSlice((MessageSlice) message, sendTo); + return true; + } else if (message instanceof AbortSlicing) { + LOG.debug("{}: handleMessage: {}", logContext, message); + onAbortSlicing((AbortSlicing) message); + return true; + } + + return false; + } + + private void onMessageSlice(final MessageSlice messageSlice, final ActorRef sendTo) { + final Identifier identifier = messageSlice.getIdentifier(); + try { + final AssembledMessageState state = stateCache.get(identifier, () -> createState(messageSlice)); + processMessageSliceForState(messageSlice, state, sendTo); + } catch (ExecutionException e) { + final MessageSliceException messageSliceEx; + final Throwable cause = e.getCause(); + if (cause instanceof MessageSliceException) { + messageSliceEx = (MessageSliceException) cause; + } else { + messageSliceEx = new MessageSliceException(String.format( + "Error creating state for identifier %s", identifier), cause); + } + + messageSlice.getReplyTo().tell(MessageSliceReply.failed(identifier, messageSliceEx, sendTo), + ActorRef.noSender()); + } + } + + private AssembledMessageState createState(final MessageSlice messageSlice) throws MessageSliceException { + final Identifier identifier = messageSlice.getIdentifier(); + if (messageSlice.getSliceIndex() == SlicedMessageState.FIRST_SLICE_INDEX) { + LOG.debug("{}: Received first slice for {} - creating AssembledMessageState", logContext, identifier); + return new AssembledMessageState(identifier, messageSlice.getTotalSlices(), + filedBackedStreamFactory, logContext); + } + + LOG.debug("{}: AssembledMessageState not found for {} - returning failed reply", logContext, identifier); + throw new MessageSliceException(String.format( + "No assembled state found for identifier %s and slice index %s", identifier, + messageSlice.getSliceIndex()), true); + } + + private void processMessageSliceForState(final MessageSlice messageSlice, AssembledMessageState state, + final ActorRef sendTo) { + final Identifier identifier = messageSlice.getIdentifier(); + final ActorRef replyTo = messageSlice.getReplyTo(); + Object reAssembledMessage = null; + synchronized (state) { + final int sliceIndex = messageSlice.getSliceIndex(); + try { + final MessageSliceReply successReply = MessageSliceReply.success(identifier, sliceIndex, sendTo); + if (state.addSlice(sliceIndex, messageSlice.getData(), messageSlice.getLastSliceHashCode())) { + LOG.debug("{}: Received last slice for {}", logContext, identifier); + + reAssembledMessage = reAssembleMessage(state); + + replyTo.tell(successReply, ActorRef.noSender()); + removeState(identifier); + } else { + LOG.debug("{}: Added slice for {} - expecting more", logContext, identifier); + replyTo.tell(successReply, ActorRef.noSender()); + } + } catch (MessageSliceException e) { + LOG.warn("{}: Error processing {}", logContext, messageSlice, e); + replyTo.tell(MessageSliceReply.failed(identifier, e, sendTo), ActorRef.noSender()); + removeState(identifier); + } + } + + if (reAssembledMessage != null) { + LOG.debug("{}: Notifying callback of re-assembled message {}", logContext, reAssembledMessage); + assembledMessageCallback.accept(reAssembledMessage, replyTo); + } + } + + private Object reAssembleMessage(final AssembledMessageState state) throws MessageSliceException { + try { + final ByteSource assembledBytes = state.getAssembledBytes(); + try (ObjectInputStream in = new ObjectInputStream(assembledBytes.openStream())) { + return in.readObject(); + } + + } catch (IOException | ClassNotFoundException e) { + throw new MessageSliceException(String.format("Error re-assembling bytes for identifier %s", + state.getIdentifier()), e); + } + } + + private void onAbortSlicing(AbortSlicing message) { + removeState(message.getIdentifier()); + } + + private void removeState(final Identifier identifier) { + LOG.debug("{}: Removing state for {}", logContext, identifier); + stateCache.invalidate(identifier); + } + + private void stateRemoved(RemovalNotification notification) { + if (notification.wasEvicted()) { + LOG.warn("{}: AssembledMessageState for {} was expired from the cache", logContext, notification.getKey()); + } else { + LOG.debug("{}: AssembledMessageState for {} was removed from the cache due to {}", logContext, + notification.getKey(), notification.getCause()); + } + + notification.getValue().close(); + } + + @VisibleForTesting + boolean hasState(Identifier forIdentifier) { + boolean exists = stateCache.getIfPresent(forIdentifier) != null; + stateCache.cleanUp(); + return exists; + } + + public static class Builder { + private FileBackedOutputStreamFactory filedBackedStreamFactory; + private BiConsumer assembledMessageCallback; + private long expireStateAfterInactivityDuration = 1; + private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES; + private String logContext = ""; + + /** + * Sets the factory for creating FileBackedOutputStream instances used for streaming messages. + * + * @param newFiledBackedStreamFactory the factory for creating FileBackedOutputStream instances + * @return this Builder + */ + public Builder filedBackedStreamFactory(final FileBackedOutputStreamFactory newFiledBackedStreamFactory) { + this.filedBackedStreamFactory = Preconditions.checkNotNull(newFiledBackedStreamFactory); + return this; + } + + /** + * Sets the Consumer callback for assembled messages. The callback takes the assembled message and the + * original sender ActorRef as arguments. + * + * @param newAssembledMessageCallback the Consumer callback + * @return this Builder + */ + public Builder assembledMessageCallback(final BiConsumer newAssembledMessageCallback) { + this.assembledMessageCallback = newAssembledMessageCallback; + return this; + } + + /** + * Sets the duration and time unit whereby assembled message state is purged from the cache due to + * inactivity from the slicing component on the other end. By default, state is purged after 1 minute of + * inactivity. + * + * @param duration the length of time after which a state entry is purged + * @param unit the unit the duration is expressed in + * @return this Builder + */ + public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) { + Preconditions.checkArgument(duration > 0, "duration must be > 0"); + this.expireStateAfterInactivityDuration = duration; + this.expireStateAfterInactivityUnit = unit; + return this; + } + + /** + * Sets the context for log messages. + * + * @param newLogContext the log context + * @return this Builder + */ + public Builder logContext(final String newLogContext) { + this.logContext = newLogContext; + return this; + } + + /** + * Builds a new MessageAssembler instance. + * + * @return a new MessageAssembler + */ + public MessageAssembler build() { + return new MessageAssembler(this); + } + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlice.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlice.java new file mode 100644 index 0000000000..00ab31d9c8 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlice.java @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import akka.actor.ActorRef; +import akka.serialization.JavaSerializer; +import akka.serialization.Serialization; +import com.google.common.base.Preconditions; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import org.opendaylight.yangtools.concepts.Identifier; + +/** + * Represents a sliced message chunk. + * + * @author Thomas Pantelis + */ +public class MessageSlice implements Serializable { + private static final long serialVersionUID = 1L; + + private final Identifier identifier; + private final byte[] data; + private final int sliceIndex; + private final int totalSlices; + private final int lastSliceHashCode; + private final ActorRef replyTo; + + public MessageSlice(Identifier identifier, byte[] data, int sliceIndex, int totalSlices, int lastSliceHashCode, + final ActorRef replyTo) { + this.identifier = Preconditions.checkNotNull(identifier); + this.data = Preconditions.checkNotNull(data); + this.sliceIndex = sliceIndex; + this.totalSlices = totalSlices; + this.lastSliceHashCode = lastSliceHashCode; + this.replyTo = Preconditions.checkNotNull(replyTo); + } + + public Identifier getIdentifier() { + return identifier; + } + + @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "Exposes a mutable object stored in a field but " + + "this is OK since this class is merely a DTO and does not process the byte[] internally." + + "Also it would be inefficient to create a return copy as the byte[] could be large.") + public byte[] getData() { + return data; + } + + public int getSliceIndex() { + return sliceIndex; + } + + public int getTotalSlices() { + return totalSlices; + } + + public int getLastSliceHashCode() { + return lastSliceHashCode; + } + + public ActorRef getReplyTo() { + return replyTo; + } + + @Override + public String toString() { + return "MessageSlice [identifier=" + identifier + ", data.length=" + data.length + ", sliceIndex=" + + sliceIndex + ", totalSlices=" + totalSlices + ", lastSliceHashCode=" + lastSliceHashCode + + ", replyTo=" + replyTo + "]"; + } + + private Object writeReplace() { + return new Proxy(this); + } + + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private MessageSlice messageSlice; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + } + + Proxy(MessageSlice messageSlice) { + this.messageSlice = messageSlice; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(messageSlice.identifier); + out.writeInt(messageSlice.sliceIndex); + out.writeInt(messageSlice.totalSlices); + out.writeInt(messageSlice.lastSliceHashCode); + out.writeObject(messageSlice.data); + out.writeObject(Serialization.serializedActorPath(messageSlice.replyTo)); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + Identifier identifier = (Identifier) in.readObject(); + int sliceIndex = in.readInt(); + int totalSlices = in.readInt(); + int lastSliceHashCode = in.readInt(); + byte[] data = (byte[])in.readObject(); + ActorRef replyTo = JavaSerializer.currentSystem().value().provider() + .resolveActorRef((String) in.readObject()); + + messageSlice = new MessageSlice(identifier, data, sliceIndex, totalSlices, lastSliceHashCode, replyTo); + } + + private Object readResolve() { + return messageSlice; + } + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceException.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceException.java new file mode 100644 index 0000000000..09ee723b97 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceException.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +/** + * An exception indicating a message slice failure. + * + * @author Thomas Pantelis + */ +public class MessageSliceException extends Exception { + private static final long serialVersionUID = 1L; + + private final boolean isRetriable; + + /** + * Constructs an instance. + * + * @param message the detail message + * @param cause the cause + */ + public MessageSliceException(final String message, final Throwable cause) { + super(message, cause); + isRetriable = false; + } + + /** + * Constructs an instance. + * + * @param message the detail message + * @param isRetriable if true, indicates the original operation can be retried + */ + public MessageSliceException(final String message, final boolean isRetriable) { + super(message); + this.isRetriable = isRetriable; + } + + /** + * Returns whether or not the original operation can be retried. + * + * @return true if it can be retried, false otherwise + */ + public boolean isRetriable() { + return isRetriable; + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifier.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifier.java new file mode 100644 index 0000000000..2d1147b15f --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifier.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import com.google.common.base.Preconditions; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.concurrent.atomic.AtomicLong; +import org.opendaylight.yangtools.concepts.Identifier; + +/** + * Identifier for a message slice that is composed of a client-supplied Identifier and an internal counter value. + * + * @author Thomas Pantelis + */ +final class MessageSliceIdentifier implements Identifier { + private static final long serialVersionUID = 1L; + private static final AtomicLong ID_COUNTER = new AtomicLong(1); + + private final Identifier clientIdentifier; + private final long messageId; + + MessageSliceIdentifier(final Identifier clientIdentifier) { + this(clientIdentifier, ID_COUNTER.getAndIncrement()); + } + + private MessageSliceIdentifier(final Identifier clientIdentifier, final long messageId) { + this.clientIdentifier = Preconditions.checkNotNull(clientIdentifier); + this.messageId = messageId; + } + + Identifier getClientIdentifier() { + return clientIdentifier; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + clientIdentifier.hashCode(); + result = prime * result + (int) (messageId ^ messageId >>> 32); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (!(obj instanceof MessageSliceIdentifier)) { + return false; + } + + MessageSliceIdentifier other = (MessageSliceIdentifier) obj; + return other.clientIdentifier.equals(clientIdentifier) && other.messageId == messageId; + } + + @Override + public String toString() { + return "MessageSliceIdentifier [clientIdentifier=" + clientIdentifier + ", messageId=" + messageId + "]"; + } + + private Object writeReplace() { + return new Proxy(this); + } + + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private MessageSliceIdentifier messageSliceId; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + } + + Proxy(MessageSliceIdentifier messageSliceId) { + this.messageSliceId = messageSliceId; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(messageSliceId.clientIdentifier); + out.writeLong(messageSliceId.messageId); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + messageSliceId = new MessageSliceIdentifier((Identifier) in.readObject(), in.readLong()); + } + + private Object readResolve() { + return messageSliceId; + } + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceReply.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceReply.java new file mode 100644 index 0000000000..dc80c330fd --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceReply.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import akka.actor.ActorRef; +import akka.serialization.JavaSerializer; +import akka.serialization.Serialization; +import com.google.common.base.Preconditions; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import java.util.Optional; +import org.opendaylight.yangtools.concepts.Identifier; + +/** + * The reply message for {@link MessageSlice}. + * + * @author Thomas Pantelis + */ +public class MessageSliceReply implements Serializable { + private static final long serialVersionUID = 1L; + + private final Identifier identifier; + private final int sliceIndex; + private final MessageSliceException failure; + private final ActorRef sendTo; + + private MessageSliceReply(final Identifier identifier, final int sliceIndex, final MessageSliceException failure, + final ActorRef sendTo) { + this.identifier = Preconditions.checkNotNull(identifier); + this.sliceIndex = sliceIndex; + this.sendTo = Preconditions.checkNotNull(sendTo); + this.failure = failure; + } + + public static MessageSliceReply success(final Identifier identifier, final int sliceIndex, final ActorRef sendTo) { + return new MessageSliceReply(identifier, sliceIndex, null, sendTo); + } + + public static MessageSliceReply failed(final Identifier identifier, final MessageSliceException failure, + final ActorRef sendTo) { + return new MessageSliceReply(identifier, -1, failure, sendTo); + } + + public Identifier getIdentifier() { + return identifier; + } + + public int getSliceIndex() { + return sliceIndex; + } + + public ActorRef getSendTo() { + return sendTo; + } + + public Optional getFailure() { + return Optional.ofNullable(failure); + } + + @Override + public String toString() { + return "MessageSliceReply [identifier=" + identifier + ", sliceIndex=" + sliceIndex + ", failure=" + failure + + ", sendTo=" + sendTo + "]"; + } + + private Object writeReplace() { + return new Proxy(this); + } + + private static class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private MessageSliceReply messageSliceReply; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + } + + Proxy(MessageSliceReply messageSliceReply) { + this.messageSliceReply = messageSliceReply; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(messageSliceReply.identifier); + out.writeInt(messageSliceReply.sliceIndex); + out.writeObject(messageSliceReply.failure); + out.writeObject(Serialization.serializedActorPath(messageSliceReply.sendTo)); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + final Identifier identifier = (Identifier) in.readObject(); + final int sliceIndex = in.readInt(); + final MessageSliceException failure = (MessageSliceException) in.readObject(); + ActorRef sendTo = JavaSerializer.currentSystem().value().provider() + .resolveActorRef((String) in.readObject()); + + messageSliceReply = new MessageSliceReply(identifier, sliceIndex, failure, sendTo); + } + + private Object readResolve() { + return messageSliceReply; + } + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java new file mode 100644 index 0000000000..484a5c2ab9 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java @@ -0,0 +1,374 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import akka.actor.ActorRef; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.io.FileBackedOutputStream; +import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; +import org.opendaylight.yangtools.concepts.Identifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class slices messages into smaller chunks. {@link MessageAssembler} is used to re-assemble the messages. + * + * @author Thomas Pantelis + * @see MessageAssembler + */ +public class MessageSlicer implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class); + public static final int DEFAULT_MAX_SLICING_TRIES = 3; + + private final Cache> stateCache; + private final FileBackedOutputStreamFactory filedBackedStreamFactory; + private final int messageSliceSize; + private final int maxSlicingTries; + private final String logContext; + + private MessageSlicer(Builder builder) { + this.filedBackedStreamFactory = builder.filedBackedStreamFactory; + this.messageSliceSize = builder.messageSliceSize; + this.maxSlicingTries = builder.maxSlicingTries; + this.logContext = builder.logContext; + + CacheBuilder> cacheBuilder = CacheBuilder.newBuilder().removalListener( + (RemovalListener>) notification -> stateRemoved(notification)); + if (builder.expireStateAfterInactivityDuration > 0) { + cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration, + builder.expireStateAfterInactivityUnit); + } + + stateCache = cacheBuilder.build(); + } + + /** + * Returns a new Builder for creating MessageSlicer instances. + * + * @return a Builder instance + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Checks if the given message is handled by this class. If so, it should be forwarded to the + * {@link #handleMessage(Object)} method + * + * @param message the message to check + * @return true if handled, false otherwise + */ + public static boolean isHandledMessage(Object message) { + return message instanceof MessageSliceReply; + } + + /** + * Slices a message into chunks based on the serialized size, the maximum message slice size and the given + * options. + * + * @param options the SliceOptions + */ + public void slice(SliceOptions options) { + final Identifier identifier = options.getIdentifier(); + final Serializable message = options.getMessage(); + final FileBackedOutputStream fileBackedStream; + if (message != null) { + LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message); + + + Preconditions.checkNotNull(filedBackedStreamFactory, + "The FiledBackedStreamFactory must be set in order to call this slice method"); + + // Serialize the message to a FileBackedOutputStream. + fileBackedStream = filedBackedStreamFactory.newInstance(); + try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) { + out.writeObject(message); + } catch (IOException e) { + LOG.debug("{}: Error serializing message for {}", logContext, identifier, e); + fileBackedStream.cleanup(); + options.getOnFailureCallback().accept(e); + return; + } + } else { + fileBackedStream = options.getFileBackedStream(); + } + + initializeSlicing(options, fileBackedStream); + } + + private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) { + final Identifier identifier = options.getIdentifier(); + MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier); + SlicedMessageState state = null; + try { + state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries, + options.getReplyTo(), options.getOnFailureCallback(), logContext); + + final Serializable message = options.getMessage(); + if (state.getTotalSlices() == 1 && message != null) { + LOG.debug("{}: Message does not need to be sliced - sending original message", logContext); + state.close(); + sendTo(options, message, options.getReplyTo()); + return; + } + + final MessageSlice firstSlice = getNextSliceMessage(state); + + LOG.debug("{}: Sending first slice: {}", logContext, firstSlice); + + stateCache.put(messageSliceId, state); + sendTo(options, firstSlice, ActorRef.noSender()); + } catch (IOException e) { + LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e); + if (state != null) { + state.close(); + } else { + fileBackedStream.cleanup(); + } + + options.getOnFailureCallback().accept(e); + } + } + + private void sendTo(SliceOptions options, Object message, ActorRef sender) { + if (options.getSendToRef() != null) { + options.getSendToRef().tell(message, sender); + } else { + options.getSendToSelection().tell(message, sender); + } + } + + /** + * Invoked to handle messages pertaining to this class. + * + * @param message the message + * @return true if the message was handled, false otherwise + */ + public boolean handleMessage(final Object message) { + if (message instanceof MessageSliceReply) { + LOG.debug("{}: handleMessage: {}", logContext, message); + onMessageSliceReply((MessageSliceReply) message); + return true; + } + + return false; + } + + /** + * Checks for and removes sliced message state that has expired due to inactivity from the assembling component + * on the other end. + */ + public void checkExpiredSlicedMessageState() { + if (stateCache.size() > 0) { + stateCache.cleanUp(); + } + } + + /** + * Closes and removes all in-progress sliced message state. + */ + @Override + public void close() { + LOG.debug("{}: Closing", logContext); + stateCache.invalidateAll(); + } + + private MessageSlice getNextSliceMessage(SlicedMessageState state) throws IOException { + final byte[] firstSliceBytes = state.getNextSlice(); + return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(), + state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget()); + } + + private void onMessageSliceReply(final MessageSliceReply reply) { + final Identifier identifier = reply.getIdentifier(); + final SlicedMessageState state = stateCache.getIfPresent(identifier); + if (state == null) { + LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply); + reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender()); + return; + } + + synchronized (state) { + try { + final Optional failure = reply.getFailure(); + if (failure.isPresent()) { + LOG.warn("{}: Received failed {}", logContext, reply); + processMessageSliceException(failure.get(), state, reply.getSendTo()); + return; + } + + if (state.getCurrentSliceIndex() != reply.getSliceIndex()) { + LOG.warn("{}: Slice index {} in {} does not match expected index {}", logContext, + reply.getSliceIndex(), reply, state.getCurrentSliceIndex()); + reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender()); + possiblyRetrySlicing(state, reply.getSendTo()); + return; + } + + if (state.isLastSlice(reply.getSliceIndex())) { + LOG.debug("{}: Received last slice reply for {}", logContext, identifier); + removeState(identifier); + } else { + final MessageSlice nextSlice = getNextSliceMessage(state); + LOG.debug("{}: Sending next slice: {}", logContext, nextSlice); + reply.getSendTo().tell(nextSlice, ActorRef.noSender()); + } + } catch (IOException e) { + LOG.warn("{}: Error processing {}", logContext, reply, e); + fail(state, e); + } + } + } + + private void processMessageSliceException(final MessageSliceException exception, + final SlicedMessageState state, final ActorRef sendTo) throws IOException { + if (exception.isRetriable()) { + possiblyRetrySlicing(state, sendTo); + } else { + fail(state, exception.getCause() != null ? exception.getCause() : exception); + } + } + + private void possiblyRetrySlicing(final SlicedMessageState state, final ActorRef sendTo) + throws IOException { + if (state.canRetry()) { + LOG.info("{}: Retrying message slicing for {}", logContext, state.getIdentifier()); + state.reset(); + sendTo.tell(getNextSliceMessage(state), ActorRef.noSender()); + } else { + String message = String.format("Maximum slicing retries reached for identifier %s - failing the message", + state.getIdentifier()); + LOG.warn(message); + fail(state, new RuntimeException(message)); + } + } + + private void removeState(final Identifier identifier) { + LOG.debug("{}: Removing state for {}", logContext, identifier); + stateCache.invalidate(identifier); + } + + private void stateRemoved(RemovalNotification> notification) { + final SlicedMessageState state = notification.getValue(); + state.close(); + if (notification.wasEvicted()) { + LOG.warn("{}: SlicedMessageState for {} was expired from the cache", logContext, notification.getKey()); + state.getOnFailureCallback().accept(new RuntimeException(String.format( + "The slicing state for message identifier %s was expired due to inactivity from the assembling " + + "component on the other end", state.getIdentifier()))); + } else { + LOG.debug("{}: SlicedMessageState for {} was removed from the cache due to {}", logContext, + notification.getKey(), notification.getCause()); + } + } + + private void fail(final SlicedMessageState state, final Throwable failure) { + removeState(state.getIdentifier()); + state.getOnFailureCallback().accept(failure); + } + + @VisibleForTesting + boolean hasState(Identifier forIdentifier) { + boolean exists = stateCache.getIfPresent(forIdentifier) != null; + stateCache.cleanUp(); + return exists; + } + + public static class Builder { + private FileBackedOutputStreamFactory filedBackedStreamFactory; + private int messageSliceSize = -1; + private long expireStateAfterInactivityDuration = -1; + private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES; + private int maxSlicingTries = DEFAULT_MAX_SLICING_TRIES; + private String logContext = ""; + + /** + * Sets the factory for creating FileBackedOutputStream instances used for streaming messages. This factory + * is used by the {@link MessageSlicer#slice(SliceOptions)} method if a Serializable message is passed. + * If Serializable messages aren't passed then the factory need not be set. + * + * @param newFiledBackedStreamFactory the factory for creating FileBackedOutputStream instances + * @return this Builder + */ + public Builder filedBackedStreamFactory(final FileBackedOutputStreamFactory newFiledBackedStreamFactory) { + this.filedBackedStreamFactory = Preconditions.checkNotNull(newFiledBackedStreamFactory); + return this; + } + + /** + * Sets the maximum size (in bytes) for a message slice. + * + * @param newMessageSliceSize the maximum size (in bytes) + * @return this Builder + */ + public Builder messageSliceSize(final int newMessageSliceSize) { + Preconditions.checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0"); + this.messageSliceSize = newMessageSliceSize; + return this; + } + + /** + * Sets the maximum number of tries for slicing a message. If exceeded, slicing fails. The default is + * defined by {@link #DEFAULT_MAX_SLICING_TRIES} + * + * @param newMaxSlicingTries the maximum number of tries + * @return this Builder + */ + public Builder maxSlicingTries(final int newMaxSlicingTries) { + Preconditions.checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0"); + this.maxSlicingTries = newMaxSlicingTries; + return this; + } + + /** + * Sets the duration and time unit whereby sliced message state is purged from the cache and the associated + * failure callback is notified due to inactivity from the assembling component on the other end. By default, + * state is not purged due to inactivity. + * + * @param duration the length of time after which a state entry is purged + * @param unit the unit the duration is expressed in + * @return this Builder + */ + public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) { + Preconditions.checkArgument(duration > 0, "duration must be > 0"); + this.expireStateAfterInactivityDuration = duration; + this.expireStateAfterInactivityUnit = unit; + return this; + } + + /** + * Sets the context for log messages. + * + * @param newLogContext the log context + * @return this Builder + */ + public Builder logContext(final String newLogContext) { + this.logContext = Preconditions.checkNotNull(newLogContext); + return this; + } + + /** + * Builds a new MessageSlicer instance. + * + * @return a new MessageSlicer + */ + public MessageSlicer build() { + return new MessageSlicer(this); + } + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/SliceOptions.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/SliceOptions.java new file mode 100644 index 0000000000..630a554590 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/SliceOptions.java @@ -0,0 +1,196 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import com.google.common.base.Preconditions; +import java.io.Serializable; +import java.util.function.Consumer; +import org.opendaylight.controller.cluster.io.FileBackedOutputStream; +import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; +import org.opendaylight.yangtools.concepts.Identifier; + +/** + * Options for slicing a message with {@link MessageSlicer#slice(SliceOptions)}. + * + * @author Thomas Pantelis + */ +public class SliceOptions { + private final Builder builder; + + private SliceOptions(Builder builder) { + this.builder = builder; + } + + public Identifier getIdentifier() { + return builder.identifier; + } + + public FileBackedOutputStream getFileBackedStream() { + return builder.fileBackedStream; + } + + public Serializable getMessage() { + return builder.message; + } + + public ActorRef getSendToRef() { + return builder.sendToRef; + } + + public ActorSelection getSendToSelection() { + return builder.sendToSelection; + } + + public ActorRef getReplyTo() { + return builder.replyTo; + } + + public Consumer getOnFailureCallback() { + return builder.onFailureCallback; + } + + /** + * Returns a new Builder for creating MessageSlicer instances. + * + * @return a Builder instance + */ + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Identifier identifier; + private FileBackedOutputStream fileBackedStream; + private Serializable message; + private ActorRef sendToRef; + private ActorSelection sendToSelection; + private ActorRef replyTo; + private Consumer onFailureCallback; + private boolean sealed; + + /** + * Sets the identifier of the component to slice. + * + * @param newIdentifier the identifier + * @return this Builder + */ + public Builder identifier(final Identifier newIdentifier) { + checkSealed(); + identifier = newIdentifier; + return this; + } + + /** + * Sets the {@link FileBackedOutputStream} containing the message data to slice. + * + * @param newFileBackedStream the {@link FileBackedOutputStream} + * @return this Builder + */ + public Builder fileBackedOutputStream(final FileBackedOutputStream newFileBackedStream) { + checkSealed(); + fileBackedStream = newFileBackedStream; + return this; + } + + /** + * Sets the message to slice. The message is first serialized to a {@link FileBackedOutputStream}. If the + * message doesn't need to be sliced, ie its serialized size is less than the maximum message slice size, then + * the original message is sent. Otherwise the first message slice is sent. + * + *

+ * Note: a {@link FileBackedOutputStreamFactory} must be set in the {@link MessageSlicer}. + * + * @param newMessage the message + * @param the Serializable message type + * @return this Builder + */ + public Builder message(final T newMessage) { + checkSealed(); + message = newMessage; + return this; + } + + /** + * Sets the reference of the actor to which to send the message slices. + * + * @param sendTo the ActorRef + * @return this Builder + */ + public Builder sendTo(final ActorRef sendTo) { + checkSealed(); + sendToRef = sendTo; + return this; + } + + /** + * Sets the ActorSelection to which to send the message slices. + * + * @param sendTo the ActorSelection + * @return this Builder + */ + public Builder sendTo(final ActorSelection sendTo) { + checkSealed(); + sendToSelection = sendTo; + return this; + } + + /** + * Sets the reference of the actor to which message slice replies should be sent. The actor should + * forward the replies to the {@link MessageSlicer#handleMessage(Object)} method. + * + * @param newReplyTo the ActorRef + * @return this Builder + */ + public Builder replyTo(final ActorRef newReplyTo) { + checkSealed(); + replyTo = newReplyTo; + return this; + } + + /** + * Sets the callback to be notified of failure. + * + * @param newOnFailureCallback the callback + * @return this Builder + */ + public Builder onFailureCallback(final Consumer newOnFailureCallback) { + checkSealed(); + onFailureCallback = newOnFailureCallback; + return this; + } + + /** + * Builds a new SliceOptions instance. + * + * @return a new SliceOptions + */ + public SliceOptions build() { + sealed = true; + + Preconditions.checkNotNull(identifier, "identifier must be set"); + Preconditions.checkNotNull(replyTo, "replyTo must be set"); + Preconditions.checkNotNull(onFailureCallback, "onFailureCallback must be set"); + Preconditions.checkState(fileBackedStream == null || message == null, + "Only one of message and fileBackedStream can be set"); + Preconditions.checkState(!(fileBackedStream == null && message == null), + "One of message and fileBackedStream must be set"); + Preconditions.checkState(sendToRef == null || sendToSelection == null, + "Only one of sendToRef and sendToSelection can be set"); + Preconditions.checkState(!(sendToRef == null && sendToSelection == null), + "One of sendToRef and sendToSelection must be set"); + + return new SliceOptions(this); + } + + protected void checkSealed() { + Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed"); + } + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/SlicedMessageState.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/SlicedMessageState.java new file mode 100644 index 0000000000..8c3cb51713 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/SlicedMessageState.java @@ -0,0 +1,239 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import com.google.common.io.ByteSource; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.function.Consumer; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.io.FileBackedOutputStream; +import org.opendaylight.yangtools.concepts.Identifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintains the state of a sliced message. + * + * @author Thomas Pantelis + * @see MessageSlicer + */ +@NotThreadSafe +public class SlicedMessageState implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(SlicedMessageState.class); + + // The index of the first slice that is sent. + static final int FIRST_SLICE_INDEX = 1; + + // The initial hash code for a slice. + static final int INITIAL_SLICE_HASH_CODE = -1; + + private final Identifier identifier; + private final int messageSliceSize; + private final FileBackedOutputStream fileBackedStream; + private final T replyTarget; + private final ByteSource messageBytes; + private final int totalSlices; + private final long totalMessageSize; + private final int maxRetries; + private final Consumer onFailureCallback; + private final String logContext; + + private int currentByteOffset = 0; + private int currentSliceIndex = FIRST_SLICE_INDEX - 1; + private int lastSliceHashCode = INITIAL_SLICE_HASH_CODE; + private int currentSliceHashCode = INITIAL_SLICE_HASH_CODE; + private int tryCount = 1; + private InputStream messageInputStream; + + /** + * Constructor. + * + * @param identifier the identifier for this instance + * @param fileBackedStream the FileBackedOutputStream containing the serialized data to slice + * @param messageSliceSize the maximum size (in bytes) for a message slice + * @param maxRetries the maximum number of retries + * @param replyTarget the user-defined target for sliced message replies + * @param onFailureCallback the callback to notify on failure + * @param logContext the context for log messages + * @throws IOException if an error occurs opening the input stream + */ + public SlicedMessageState(final Identifier identifier, final FileBackedOutputStream fileBackedStream, + final int messageSliceSize, final int maxRetries, final T replyTarget, + final Consumer onFailureCallback, final String logContext) throws IOException { + this.identifier = identifier; + this.fileBackedStream = fileBackedStream; + this.messageSliceSize = messageSliceSize; + this.maxRetries = maxRetries; + this.replyTarget = replyTarget; + this.onFailureCallback = onFailureCallback; + this.logContext = logContext; + + messageBytes = fileBackedStream.asByteSource(); + totalMessageSize = messageBytes.size(); + messageInputStream = messageBytes.openStream(); + + totalSlices = (int)(totalMessageSize / messageSliceSize + (totalMessageSize % messageSliceSize > 0 ? 1 : 0)); + + LOG.debug("{}: Message size: {} bytes, total slices to send: {}", logContext, totalMessageSize, totalSlices); + } + + /** + * Returns the current slice index that has been sent. + * + * @return the current slice index that has been sent + */ + public int getCurrentSliceIndex() { + return currentSliceIndex; + } + + /** + * Returns the hash code of the last slice that was sent. + * + * @return the hash code of the last slice that was sent + */ + public int getLastSliceHashCode() { + return lastSliceHashCode; + } + + /** + * Returns the total number of slices to send. + * + * @return the total number of slices to send + */ + public int getTotalSlices() { + return totalSlices; + } + + /** + * Returns the identifier of this instance. + * + * @return the identifier + */ + public Identifier getIdentifier() { + return identifier; + } + + /** + * Returns the user-defined target for sliced message replies. + * + * @return the user-defined target + */ + public T getReplyTarget() { + return replyTarget; + } + + /** + * Returns the callback to notify on failure. + * + * @return the callback to notify on failure + */ + public Consumer getOnFailureCallback() { + return onFailureCallback; + } + + /** + * Determines if the slicing can be retried. + * + * @return true if the slicing can be retried, false if the maximum number of retries has been reached + */ + public boolean canRetry() { + return tryCount <= maxRetries; + } + + /** + * Determines if the given index is the last slice to send. + * + * @param index the slice index to test + * @return true if the index is the last slice, false otherwise + */ + public boolean isLastSlice(int index) { + return totalSlices == index; + } + + /** + * Reads and returns the next slice of data. + * + * @return the next slice of data as a byte[] + * @throws IOException if an error occurs reading the data + */ + public byte[] getNextSlice() throws IOException { + currentSliceIndex++; + final int start; + if (currentSliceIndex == FIRST_SLICE_INDEX) { + start = 0; + } else { + start = incrementByteOffset(); + } + + final int size; + if (messageSliceSize > totalMessageSize) { + size = (int) totalMessageSize; + } else if (start + messageSliceSize > totalMessageSize) { + size = (int) (totalMessageSize - start); + } else { + size = messageSliceSize; + } + + LOG.debug("{}: getNextSlice: total size: {}, offset: {}, size: {}, index: {}", logContext, totalMessageSize, + start, size, currentSliceIndex); + + byte[] nextSlice = new byte[size]; + int numRead = messageInputStream.read(nextSlice); + if (numRead != size) { + throw new IOException(String.format( + "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size)); + } + + lastSliceHashCode = currentSliceHashCode; + currentSliceHashCode = Arrays.hashCode(nextSlice); + + return nextSlice; + } + + /** + * Resets this instance to restart slicing from the beginning. + * + * @throws IOException if an error occurs resetting the input stream + */ + public void reset() throws IOException { + closeStream(); + + tryCount++; + currentByteOffset = 0; + currentSliceIndex = FIRST_SLICE_INDEX - 1; + lastSliceHashCode = INITIAL_SLICE_HASH_CODE; + currentSliceHashCode = INITIAL_SLICE_HASH_CODE; + + messageInputStream = messageBytes.openStream(); + } + + private int incrementByteOffset() { + currentByteOffset += messageSliceSize; + return currentByteOffset; + } + + private void closeStream() { + if (messageInputStream != null) { + try { + messageInputStream.close(); + } catch (IOException e) { + LOG.warn("{}: Error closing message stream", logContext, e); + } + + messageInputStream = null; + } + } + + @Override + public void close() { + closeStream(); + fileBackedStream.cleanup(); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbortSlicingTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbortSlicingTest.java new file mode 100644 index 0000000000..4441857f39 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbortSlicingTest.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import static org.junit.Assert.assertEquals; + +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; + +/** + * Unit tests for AbortSlicing. + * + * @author Thomas Pantelis + */ +public class AbortSlicingTest { + + @Test + public void testSerialization() { + AbortSlicing expected = new AbortSlicing(new StringIdentifier("test")); + AbortSlicing cloned = (AbortSlicing) SerializationUtils.clone(expected); + assertEquals("getIdentifier", expected.getIdentifier(), cloned.getIdentifier()); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbstractMessagingTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbstractMessagingTest.java new file mode 100644 index 0000000000..3d93755ee1 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/AbstractMessagingTest.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import akka.testkit.TestProbe; +import com.google.common.io.ByteSource; +import java.io.IOException; +import java.io.InputStream; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.io.FileBackedOutputStream; +import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; + +/** + * Abstract base class for messaging tests. + * + * @author Thomas Pantelis + */ +public class AbstractMessagingTest { + protected static final StringIdentifier IDENTIFIER = new StringIdentifier("test"); + protected static ActorSystem ACTOR_SYSTEM; + + protected final TestProbe testProbe = TestProbe.apply(ACTOR_SYSTEM); + + @Mock + protected FileBackedOutputStreamFactory mockFiledBackedStreamFactory; + + @Mock + protected FileBackedOutputStream mockFiledBackedStream; + + @Mock + protected ByteSource mockByteSource; + + @Mock + protected InputStream mockInputStream; + + @BeforeClass + public static void setupClass() throws IOException { + ACTOR_SYSTEM = ActorSystem.create("test"); + } + + @Before + public void setup() throws IOException { + MockitoAnnotations.initMocks(this); + + doReturn(mockFiledBackedStream).when(mockFiledBackedStreamFactory).newInstance(); + doNothing().when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt()); + doNothing().when(mockFiledBackedStream).write(any(byte[].class)); + doNothing().when(mockFiledBackedStream).write(anyInt()); + doNothing().when(mockFiledBackedStream).close(); + doNothing().when(mockFiledBackedStream).cleanup(); + doNothing().when(mockFiledBackedStream).flush(); + doReturn(mockByteSource).when(mockFiledBackedStream).asByteSource(); + + doReturn(mockInputStream).when(mockByteSource).openStream(); + doReturn(mockInputStream).when(mockByteSource).openBufferedStream(); + doReturn(10L).when(mockByteSource).size(); + + doReturn(0).when(mockInputStream).read(any(byte[].class)); + } + + @AfterClass + public static void tearDownClass() { + JavaTestKit.shutdownActorSystem(ACTOR_SYSTEM, Boolean.TRUE); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/BytesMessage.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/BytesMessage.java new file mode 100644 index 0000000000..1805bed837 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/BytesMessage.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import java.io.Serializable; +import java.util.Arrays; + +/** + * Serializable message that stores a byte[]. + * + * @author Thomas Pantelis + */ +public class BytesMessage implements Serializable { + private static final long serialVersionUID = 1L; + + private final byte[] bytes; + + public BytesMessage(byte[] bytes) { + this.bytes = Arrays.copyOf(bytes, bytes.length); + } + + @Override + public int hashCode() { + return Arrays.hashCode(bytes); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + BytesMessage other = (BytesMessage) obj; + return Arrays.equals(bytes, other.bytes); + } + + @Override + public String toString() { + return "BytesMessage [bytes=" + Arrays.toString(bytes) + "]"; + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageAssemblerTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageAssemblerTest.java new file mode 100644 index 0000000000..e3a68eea26 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageAssemblerTest.java @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.opendaylight.controller.cluster.messaging.MessageSlicingIntegrationTest.assertAssembledMessage; +import static org.opendaylight.controller.cluster.messaging.MessageSlicingIntegrationTest.assertFailedMessageSliceReply; +import static org.opendaylight.controller.cluster.messaging.MessageSlicingIntegrationTest.assertSuccessfulMessageSliceReply; + +import akka.actor.ActorRef; +import com.google.common.util.concurrent.Uninterruptibles; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import org.apache.commons.lang3.SerializationUtils; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.opendaylight.controller.cluster.io.FileBackedOutputStream; +import org.opendaylight.controller.cluster.messaging.MessageAssembler.Builder; + +/** + * Unit tests for MessageAssembler. + * + * @author Thomas Pantelis + */ +public class MessageAssemblerTest extends AbstractMessagingTest { + + @Mock + private BiConsumer mockAssembledMessageCallback; + + @Override + @Before + public void setup() throws IOException { + super.setup(); + + doNothing().when(mockAssembledMessageCallback).accept(any(Object.class), any(ActorRef.class)); + } + + @Test + public void testHandledMessages() { + final MessageSlice messageSlice = new MessageSlice(IDENTIFIER, new byte[0], 1, 1, 1, testProbe.ref()); + final AbortSlicing abortSlicing = new AbortSlicing(IDENTIFIER); + assertEquals("isHandledMessage", Boolean.TRUE, MessageAssembler.isHandledMessage(messageSlice)); + assertEquals("isHandledMessage", Boolean.TRUE, MessageAssembler.isHandledMessage(abortSlicing)); + assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object())); + + try (MessageAssembler assembler = newMessageAssembler("testHandledMessages")) { + assertEquals("handledMessage", Boolean.TRUE, assembler.handleMessage(messageSlice, testProbe.ref())); + assertEquals("handledMessage", Boolean.TRUE, assembler.handleMessage(abortSlicing, testProbe.ref())); + assertEquals("handledMessage", Boolean.FALSE, assembler.handleMessage(new Object(), testProbe.ref())); + } + } + + @Test + public void testSingleMessageSlice() { + try (MessageAssembler assembler = newMessageAssembler("testSingleMessageSlice")) { + final FileBackedOutputStream fileBackStream = spy(new FileBackedOutputStream(100000000, null)); + doReturn(fileBackStream).when(mockFiledBackedStreamFactory).newInstance(); + + final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER); + final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3}); + + final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1, + SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref()); + assembler.handleMessage(messageSlice, testProbe.ref()); + + final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class); + assertSuccessfulMessageSliceReply(reply, IDENTIFIER, 1); + + assertAssembledMessage(mockAssembledMessageCallback, message, testProbe.ref()); + + assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier)); + verify(fileBackStream).cleanup(); + } + } + + @Test + public void testMessageSliceWithByteSourceFailure() throws IOException { + try (MessageAssembler assembler = newMessageAssembler("testMessageSliceWithByteSourceFailure")) { + IOException mockFailure = new IOException("mock IOException"); + doThrow(mockFailure).when(mockByteSource).openStream(); + doThrow(mockFailure).when(mockByteSource).openBufferedStream(); + + final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER); + final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3}); + + final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1, + SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref()); + assembler.handleMessage(messageSlice, testProbe.ref()); + + final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class); + assertFailedMessageSliceReply(reply, IDENTIFIER, false); + assertEquals("Failure cause", mockFailure, reply.getFailure().get().getCause()); + + assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier)); + verify(mockFiledBackedStream).cleanup(); + } + } + + @Test + public void testMessageSliceWithStreamWriteFailure() throws IOException { + try (MessageAssembler assembler = newMessageAssembler("testMessageSliceWithStreamWriteFailure")) { + IOException mockFailure = new IOException("mock IOException"); + doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt()); + doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class)); + doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt()); + doThrow(mockFailure).when(mockFiledBackedStream).flush(); + + final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER); + final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3}); + + final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1, + SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref()); + assembler.handleMessage(messageSlice, testProbe.ref()); + + final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class); + assertFailedMessageSliceReply(reply, IDENTIFIER, false); + assertEquals("Failure cause", mockFailure, reply.getFailure().get().getCause()); + + assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier)); + verify(mockFiledBackedStream).cleanup(); + } + } + + @Test + public void testAssembledMessageStateExpiration() throws IOException { + final int expiryDuration = 200; + try (MessageAssembler assembler = newMessageAssemblerBuilder("testAssembledMessageStateExpiration") + .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) { + final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER); + final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3}); + + final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 2, + SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref()); + assembler.handleMessage(messageSlice, testProbe.ref()); + + final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class); + assertSuccessfulMessageSliceReply(reply, IDENTIFIER, 1); + + assertTrue("MessageAssembler should have remove state for " + identifier, assembler.hasState(identifier)); + Uninterruptibles.sleepUninterruptibly(expiryDuration + 50, TimeUnit.MILLISECONDS); + assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier)); + + verify(mockFiledBackedStream).cleanup(); + } + } + + @Test + public void testFirstMessageSliceWithInvalidIndex() { + try (MessageAssembler assembler = newMessageAssembler("testFirstMessageSliceWithInvalidIndex")) { + final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER); + final MessageSlice messageSlice = new MessageSlice(identifier, new byte[0], 2, 3, 1, testProbe.ref()); + assembler.handleMessage(messageSlice, testProbe.ref()); + + final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class); + assertFailedMessageSliceReply(reply, IDENTIFIER, true); + assertFalse("MessageAssembler should not have state for " + identifier, assembler.hasState(identifier)); + } + } + + private MessageAssembler newMessageAssembler(String logContext) { + return newMessageAssemblerBuilder(logContext).build(); + } + + private Builder newMessageAssemblerBuilder(String logContext) { + return MessageAssembler.builder().filedBackedStreamFactory(mockFiledBackedStreamFactory) + .assembledMessageCallback(mockAssembledMessageCallback).logContext(logContext); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifierTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifierTest.java new file mode 100644 index 0000000000..addfc535ba --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifierTest.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import static org.junit.Assert.assertEquals; + +import org.apache.commons.lang.SerializationUtils; +import org.junit.Test; + +/** + * Unit tests for MessageSliceIdentifier. + * + * @author Thomas Pantelis + */ +public class MessageSliceIdentifierTest { + + @Test + public void testSerialization() { + MessageSliceIdentifier expected = new MessageSliceIdentifier(new StringIdentifier("test")); + MessageSliceIdentifier cloned = (MessageSliceIdentifier) SerializationUtils.clone(expected); + assertEquals("cloned", expected, cloned); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceReplyTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceReplyTest.java new file mode 100644 index 0000000000..fa31fbfeef --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceReplyTest.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import static org.junit.Assert.assertEquals; + +import akka.actor.ActorSystem; +import akka.actor.ExtendedActorSystem; +import akka.serialization.JavaSerializer; +import akka.testkit.JavaTestKit; +import akka.testkit.TestProbe; +import org.apache.commons.lang.SerializationUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit tests for MessageSliceReply. + * + * @author Thomas Pantelis + */ +public class MessageSliceReplyTest { + private final ActorSystem actorSystem = ActorSystem.create("test"); + + @Before + public void setUp() { + JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) actorSystem); + } + + @After + public void tearDown() { + JavaTestKit.shutdownActorSystem(actorSystem, Boolean.TRUE); + } + + @Test + public void testSerialization() { + testSuccess(); + testFailure(); + } + + private void testSuccess() { + MessageSliceReply expected = MessageSliceReply.success(new StringIdentifier("test"), 3, + TestProbe.apply(actorSystem).ref()); + MessageSliceReply cloned = (MessageSliceReply) SerializationUtils.clone(expected); + + assertEquals("getIdentifier", expected.getIdentifier(), cloned.getIdentifier()); + assertEquals("getSliceIndex", expected.getSliceIndex(), cloned.getSliceIndex()); + assertEquals("getSendTo", expected.getSendTo(), cloned.getSendTo()); + assertEquals("getFailure present", Boolean.FALSE, cloned.getFailure().isPresent()); + } + + private void testFailure() { + MessageSliceReply expected = MessageSliceReply.failed(new StringIdentifier("test"), + new MessageSliceException("mock", true), TestProbe.apply(actorSystem).ref()); + MessageSliceReply cloned = (MessageSliceReply) SerializationUtils.clone(expected); + + assertEquals("getIdentifier", expected.getIdentifier(), cloned.getIdentifier()); + assertEquals("getSliceIndex", expected.getSliceIndex(), cloned.getSliceIndex()); + assertEquals("getSendTo", expected.getSendTo(), cloned.getSendTo()); + assertEquals("getFailure present", Boolean.TRUE, cloned.getFailure().isPresent()); + assertEquals("getFailure message", expected.getFailure().get().getMessage(), + cloned.getFailure().get().getMessage()); + assertEquals("getFailure isRetriable", expected.getFailure().get().isRetriable(), + cloned.getFailure().get().isRetriable()); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceTest.java new file mode 100644 index 0000000000..6331e0b7b2 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceTest.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import akka.actor.ActorSystem; +import akka.actor.ExtendedActorSystem; +import akka.serialization.JavaSerializer; +import akka.testkit.JavaTestKit; +import akka.testkit.TestProbe; +import org.apache.commons.lang.SerializationUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit tests for MessageSlice. + * + * @author Thomas Pantelis + */ +public class MessageSliceTest { + private final ActorSystem actorSystem = ActorSystem.create("test"); + + @Before + public void setUp() { + JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) actorSystem); + } + + @After + public void tearDown() { + JavaTestKit.shutdownActorSystem(actorSystem, Boolean.TRUE); + } + + @Test + public void testSerialization() { + byte[] data = new byte[1000]; + for (int i = 0, j = 0; i < data.length; i++) { + data[i] = (byte)j; + if (++j >= 255) { + j = 0; + } + } + + MessageSlice expected = new MessageSlice(new StringIdentifier("test"), data, 2, 3, 54321, + TestProbe.apply(actorSystem).ref()); + MessageSlice cloned = (MessageSlice) SerializationUtils.clone(expected); + + assertEquals("getIdentifier", expected.getIdentifier(), cloned.getIdentifier()); + assertEquals("getSliceIndex", expected.getSliceIndex(), cloned.getSliceIndex()); + assertEquals("getTotalSlices", expected.getTotalSlices(), cloned.getTotalSlices()); + assertEquals("getLastSliceHashCode", expected.getLastSliceHashCode(), cloned.getLastSliceHashCode()); + assertArrayEquals("getData", expected.getData(), cloned.getData()); + assertEquals("getReplyTo", expected.getReplyTo(), cloned.getReplyTo()); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java new file mode 100644 index 0000000000..20b6cab818 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import akka.actor.ActorRef; +import com.google.common.util.concurrent.Uninterruptibles; +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.opendaylight.yangtools.concepts.Identifier; + +/** + * Unit tests for MessageSlicer. + * + * @author Thomas Pantelis + */ +public class MessageSlicerTest extends AbstractMessagingTest { + @Mock + private Consumer mockOnFailureCallback; + + @Override + @Before + public void setup() throws IOException { + super.setup(); + + doNothing().when(mockOnFailureCallback).accept(any(Throwable.class)); + } + + @Test + public void testHandledMessages() { + final MessageSliceReply reply = MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref()); + assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply)); + assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object())); + + try (MessageSlicer slicer = newMessageSlicer("testHandledMessages", 100)) { + assertEquals("handledMessage", Boolean.TRUE, slicer.handleMessage(reply)); + assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(new Object())); + } + } + + @Test + public void testSliceWithFailedSerialization() throws IOException { + IOException mockFailure = new IOException("mock IOException"); + doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt()); + doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class)); + doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt()); + doThrow(mockFailure).when(mockFiledBackedStream).flush(); + + try (MessageSlicer slicer = newMessageSlicer("testSliceWithFailedSerialization", 100)) { + slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(), + mockOnFailureCallback); + + assertFailureCallback(IOException.class); + verify(mockFiledBackedStream).cleanup(); + } + } + + @Test + public void testSliceWithByteSourceFailure() throws IOException { + IOException mockFailure = new IOException("mock IOException"); + doThrow(mockFailure).when(mockByteSource).openStream(); + doThrow(mockFailure).when(mockByteSource).openBufferedStream(); + + try (MessageSlicer slicer = newMessageSlicer("testSliceWithByteSourceFailure", 100)) { + slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(), + mockOnFailureCallback); + + assertFailureCallback(IOException.class); + verify(mockFiledBackedStream).cleanup(); + } + } + + @Test + public void testSliceWithInputStreamFailure() throws IOException { + doReturn(0).when(mockInputStream).read(any(byte[].class)); + + try (MessageSlicer slicer = newMessageSlicer("testSliceWithInputStreamFailure", 2)) { + slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(), + mockOnFailureCallback); + + assertFailureCallback(IOException.class); + verify(mockFiledBackedStream).cleanup(); + } + } + + @Test + public void testMessageSliceReplyWithNoState() { + try (MessageSlicer slicer = newMessageSlicer("testMessageSliceReplyWithNoState", 1000)) { + slicer.handleMessage(MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref())); + final AbortSlicing abortSlicing = testProbe.expectMsgClass(AbortSlicing.class); + assertEquals("Identifier", IDENTIFIER, abortSlicing.getIdentifier()); + } + } + + @Test + public void testCloseAllSlicedMessageState() throws IOException { + doReturn(1).when(mockInputStream).read(any(byte[].class)); + + final MessageSlicer slicer = newMessageSlicer("testCloseAllSlicedMessageState", 1); + slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(), + mockOnFailureCallback); + + slicer.close(); + + verify(mockFiledBackedStream).cleanup(); + verifyNoMoreInteractions(mockOnFailureCallback); + } + + @Test + public void testCheckExpiredSlicedMessageState() throws IOException { + doReturn(1).when(mockInputStream).read(any(byte[].class)); + + final int expiryDuration = 200; + try (MessageSlicer slicer = MessageSlicer.builder().messageSliceSize(1) + .logContext("testCheckExpiredSlicedMessageState") + .filedBackedStreamFactory(mockFiledBackedStreamFactory) + .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) { + slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(), + mockOnFailureCallback); + + Uninterruptibles.sleepUninterruptibly(expiryDuration + 50, TimeUnit.MILLISECONDS); + slicer.checkExpiredSlicedMessageState(); + + assertFailureCallback(RuntimeException.class); + verify(mockFiledBackedStream).cleanup(); + } + } + + private void assertFailureCallback(final Class exceptionType) { + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(mockOnFailureCallback).accept(exceptionCaptor.capture()); + assertEquals("Exception type", exceptionType, exceptionCaptor.getValue().getClass()); + } + + private MessageSlicer newMessageSlicer(String logContext, final int messageSliceSize) { + return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext) + .filedBackedStreamFactory(mockFiledBackedStreamFactory).build(); + } + + static void slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo, + ActorRef replyTo, Consumer onFailureCallback) { + slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo).replyTo(replyTo) + .onFailureCallback(onFailureCallback).build()); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicingIntegrationTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicingIntegrationTest.java new file mode 100644 index 0000000000..a490a24789 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicingIntegrationTest.java @@ -0,0 +1,341 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.opendaylight.controller.cluster.messaging.MessageSlicerTest.slice; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import akka.testkit.TestProbe; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.Arrays; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import org.apache.commons.lang3.SerializationUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.opendaylight.controller.cluster.io.FileBackedOutputStream; +import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; +import org.opendaylight.yangtools.concepts.Identifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * End-to-end integration tests for message slicing. + * + * @author Thomas Pantelis + */ +public class MessageSlicingIntegrationTest { + private static final Logger LOG = LoggerFactory.getLogger(MessageSlicingIntegrationTest.class); + + private static final ActorSystem ACTOR_SYSTEM = ActorSystem.create("test"); + private static final FileBackedOutputStreamFactory FILE_BACKED_STREAM_FACTORY = + new FileBackedOutputStreamFactory(1000000000, "target"); + private static final Identifier IDENTIFIER = new StringIdentifier("stringId"); + private static final int DONT_CARE = -1; + + private final TestProbe sendToProbe = TestProbe.apply(ACTOR_SYSTEM); + private final TestProbe replyToProbe = TestProbe.apply(ACTOR_SYSTEM); + + @SuppressWarnings("unchecked") + private final Consumer mockOnFailureCallback = mock(Consumer.class); + + @SuppressWarnings("unchecked") + private final BiConsumer mockAssembledMessageCallback = mock(BiConsumer.class); + + private final MessageAssembler assembler = MessageAssembler.builder() + .assembledMessageCallback(mockAssembledMessageCallback).logContext("test") + .filedBackedStreamFactory(FILE_BACKED_STREAM_FACTORY).build(); + + @Before + public void setup() { + doNothing().when(mockOnFailureCallback).accept(any(Throwable.class)); + doNothing().when(mockAssembledMessageCallback).accept(any(Object.class), any(ActorRef.class)); + } + + @After + public void tearDown() { + assembler.close(); + } + + @AfterClass + public static void staticTearDown() { + JavaTestKit.shutdownActorSystem(ACTOR_SYSTEM, Boolean.TRUE); + } + + @Test + public void testSlicingWithChunks() throws IOException { + LOG.info("testSlicingWithChunks starting"); + + // First slice a message where the messageSliceSize divides evenly into the serialized size. + + byte[] emptyMessageBytes = SerializationUtils.serialize(new BytesMessage(new byte[]{})); + int messageSliceSize = 10; + int expTotalSlices = emptyMessageBytes.length / messageSliceSize; + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + if (emptyMessageBytes.length % messageSliceSize > 0) { + expTotalSlices++; + int padding = messageSliceSize - emptyMessageBytes.length % messageSliceSize; + byte value = 1; + for (int i = 0; i < padding; i++, value++) { + byteStream.write(value); + } + } + + testSlicing("testSlicingWithChunks", messageSliceSize, expTotalSlices, byteStream.toByteArray()); + + // Now slice a message where the messageSliceSize doesn't divide evenly. + + byteStream.write(new byte[]{100, 101, 102}); + testSlicing("testSlicingWithChunks", messageSliceSize, expTotalSlices + 1, byteStream.toByteArray()); + + LOG.info("testSlicingWithChunks ending"); + } + + @Test + public void testSingleSlice() { + LOG.info("testSingleSlice starting"); + + // Slice a message where the serialized size is equal to the messageSliceSize. In this case it should + // just send the original message. + + final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3}); + try (MessageSlicer slicer = newMessageSlicer("testSingleSlice", SerializationUtils.serialize(message).length)) { + slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback); + + final BytesMessage sentMessage = sendToProbe.expectMsgClass(BytesMessage.class); + assertEquals("Sent message", message, sentMessage); + } + + LOG.info("testSingleSlice ending"); + } + + @Test + public void testSlicingWithRetry() { + LOG.info("testSlicingWithRetry starting"); + + final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3}); + final int messageSliceSize = SerializationUtils.serialize(message).length / 2; + try (MessageSlicer slicer = newMessageSlicer("testSlicingWithRetry", messageSliceSize)) { + slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback); + + MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class); + assembler.handleMessage(sliceMessage, sendToProbe.ref()); + + // Swallow the reply and send the MessageSlice again - it should return a failed reply. + replyToProbe.expectMsgClass(MessageSliceReply.class); + assembler.handleMessage(sliceMessage, sendToProbe.ref()); + + final MessageSliceReply failedReply = replyToProbe.expectMsgClass(MessageSliceReply.class); + assertFailedMessageSliceReply(failedReply, IDENTIFIER, true); + + // Send the failed reply - slicing should be retried from the beginning. + + slicer.handleMessage(failedReply); + while (true) { + sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class); + assembler.handleMessage(sliceMessage, sendToProbe.ref()); + + final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class); + assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceMessage.getSliceIndex()); + slicer.handleMessage(reply); + + if (reply.getSliceIndex() == sliceMessage.getTotalSlices()) { + break; + } + } + + assertAssembledMessage(message, replyToProbe.ref()); + } + + LOG.info("testSlicingWithRetry ending"); + } + + @Test + public void testSlicingWithMaxRetriesReached() { + LOG.info("testSlicingWithMaxRetriesReached starting"); + + final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3}); + final int messageSliceSize = SerializationUtils.serialize(message).length / 2; + try (MessageSlicer slicer = newMessageSlicer("testSlicingWithMaxRetriesReached", messageSliceSize)) { + slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback); + + Identifier slicingId = null; + for (int i = 0; i < MessageSlicer.DEFAULT_MAX_SLICING_TRIES; i++) { + MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class); + slicingId = sliceMessage.getIdentifier(); + assertMessageSlice(sliceMessage, IDENTIFIER, 1, DONT_CARE, SlicedMessageState.INITIAL_SLICE_HASH_CODE, + replyToProbe.ref()); + assembler.handleMessage(sliceMessage, sendToProbe.ref()); + + // Swallow the reply and send the MessageSlicer a reply with an invalid index. + final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class); + assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceMessage.getSliceIndex()); + slicer.handleMessage(MessageSliceReply.success(reply.getIdentifier(), 100000, reply.getSendTo())); + + final AbortSlicing abortSlicing = sendToProbe.expectMsgClass(AbortSlicing.class); + assertEquals("Identifier", slicingId, abortSlicing.getIdentifier()); + assembler.handleMessage(abortSlicing, sendToProbe.ref()); + } + + slicer.handleMessage(MessageSliceReply.success(slicingId, 100000, sendToProbe.ref())); + + assertFailureCallback(RuntimeException.class); + + assertFalse("MessageSlicer did not remove state for " + slicingId, slicer.hasState(slicingId)); + assertFalse("MessageAssembler did not remove state for " + slicingId, assembler.hasState(slicingId)); + } + + LOG.info("testSlicingWithMaxRetriesReached ending"); + } + + @Test + public void testSlicingWithFailure() { + LOG.info("testSlicingWithFailure starting"); + + final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3}); + final int messageSliceSize = SerializationUtils.serialize(message).length / 2; + try (MessageSlicer slicer = newMessageSlicer("testSlicingWithFailure", messageSliceSize)) { + slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback); + + MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class); + + MessageSliceException failure = new MessageSliceException("mock failure", + new IOException("mock IOException")); + slicer.handleMessage(MessageSliceReply.failed(sliceMessage.getIdentifier(), failure, sendToProbe.ref())); + + assertFailureCallback(IOException.class); + + assertFalse("MessageSlicer did not remove state for " + sliceMessage.getIdentifier(), + slicer.hasState(sliceMessage.getIdentifier())); + } + + LOG.info("testSlicingWithFailure ending"); + } + + @Test + public void testSliceWithFileBackedOutputStream() throws IOException { + LOG.info("testSliceWithFileBackedOutputStream starting"); + + final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3}); + FileBackedOutputStream fileBackedStream = FILE_BACKED_STREAM_FACTORY.newInstance(); + try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) { + out.writeObject(message); + } + + try (MessageSlicer slicer = newMessageSlicer("testSliceWithFileBackedOutputStream", + SerializationUtils.serialize(message).length)) { + slicer.slice(SliceOptions.builder().identifier(IDENTIFIER).fileBackedOutputStream(fileBackedStream) + .sendTo(ACTOR_SYSTEM.actorSelection(sendToProbe.ref().path())).replyTo(replyToProbe.ref()) + .onFailureCallback(mockOnFailureCallback).build()); + + final MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class); + assembler.handleMessage(sliceMessage, sendToProbe.ref()); + assertAssembledMessage(message, replyToProbe.ref()); + } + + LOG.info("testSliceWithFileBackedOutputStream ending"); + } + + @SuppressWarnings("unchecked") + private void testSlicing(String logContext, int messageSliceSize, int expTotalSlices, byte[] messageData) { + reset(mockAssembledMessageCallback); + + final BytesMessage message = new BytesMessage(messageData); + + try (MessageSlicer slicer = newMessageSlicer(logContext, messageSliceSize)) { + slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback); + + Identifier slicingId = null; + int expLastSliceHashCode = SlicedMessageState.INITIAL_SLICE_HASH_CODE; + for (int sliceIndex = 1; sliceIndex <= expTotalSlices; sliceIndex++) { + final MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class); + slicingId = sliceMessage.getIdentifier(); + assertMessageSlice(sliceMessage, IDENTIFIER, sliceIndex, expTotalSlices, expLastSliceHashCode, + replyToProbe.ref()); + + assembler.handleMessage(sliceMessage, sendToProbe.ref()); + + final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class); + assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceIndex); + + expLastSliceHashCode = Arrays.hashCode(sliceMessage.getData()); + + slicer.handleMessage(reply); + } + + assertAssembledMessage(message, replyToProbe.ref()); + + assertFalse("MessageSlicer did not remove state for " + slicingId, slicer.hasState(slicingId)); + assertFalse("MessageAssembler did not remove state for " + slicingId, assembler.hasState(slicingId)); + } + } + + private void assertFailureCallback(final Class exceptionType) { + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(mockOnFailureCallback).accept(exceptionCaptor.capture()); + assertEquals("Exception type", exceptionType, exceptionCaptor.getValue().getClass()); + } + + private void assertAssembledMessage(final BytesMessage message, final ActorRef sender) { + assertAssembledMessage(mockAssembledMessageCallback, message, sender); + } + + static void assertAssembledMessage(final BiConsumer mockAssembledMessageCallback, + final BytesMessage message, final ActorRef sender) { + ArgumentCaptor assembledMessageCaptor = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor senderActorRefCaptor = ArgumentCaptor.forClass(ActorRef.class); + verify(mockAssembledMessageCallback).accept(assembledMessageCaptor.capture(), senderActorRefCaptor.capture()); + assertEquals("Assembled message", message, assembledMessageCaptor.getValue()); + assertEquals("Sender ActorRef", sender, senderActorRefCaptor.getValue()); + } + + static void assertSuccessfulMessageSliceReply(MessageSliceReply reply, Identifier identifier, int sliceIndex) { + assertEquals("Identifier", identifier, ((MessageSliceIdentifier)reply.getIdentifier()) + .getClientIdentifier()); + assertEquals("SliceIndex", sliceIndex, reply.getSliceIndex()); + } + + static void assertFailedMessageSliceReply(MessageSliceReply reply, Identifier identifier, boolean isRetriable) { + assertEquals("Identifier", identifier, ((MessageSliceIdentifier)reply.getIdentifier()) + .getClientIdentifier()); + assertEquals("Failure present", Boolean.TRUE, reply.getFailure().isPresent()); + assertEquals("isRetriable", isRetriable, reply.getFailure().get().isRetriable()); + } + + static void assertMessageSlice(MessageSlice sliceMessage, Identifier identifier, int sliceIndex, int totalSlices, + int lastSliceHashCode, ActorRef replyTo) { + assertEquals("Identifier", identifier, ((MessageSliceIdentifier)sliceMessage.getIdentifier()) + .getClientIdentifier()); + assertEquals("SliceIndex", sliceIndex, sliceMessage.getSliceIndex()); + assertEquals("LastSliceHashCode", lastSliceHashCode, sliceMessage.getLastSliceHashCode()); + assertEquals("ReplyTo", replyTo, sliceMessage.getReplyTo()); + + if (totalSlices != DONT_CARE) { + assertEquals("TotalSlices", totalSlices, sliceMessage.getTotalSlices()); + } + } + + private MessageSlicer newMessageSlicer(String logContext, final int messageSliceSize) { + return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext) + .filedBackedStreamFactory(FILE_BACKED_STREAM_FACTORY).build(); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/StringIdentifier.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/StringIdentifier.java new file mode 100644 index 0000000000..6686e01762 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/StringIdentifier.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.messaging; + +import org.opendaylight.yangtools.util.AbstractStringIdentifier; + +/** + * Identifier that stores a string. + * + * @author Thomas Pantelis + */ +public class StringIdentifier extends AbstractStringIdentifier { + private static final long serialVersionUID = 1L; + + public StringIdentifier(String string) { + super(string); + } +} -- 2.36.6