X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-clustering-commons%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fmessaging%2FMessageSlicer.java;h=4c3d67bdaaca759d0e81acee561dff09b8bab27b;hp=ec7dcca06a3106448c91eb0b7091fc24e28e3aa1;hb=d7c9a8ccfcb57f005490a226803d094289997ef9;hpb=d8be13e36b4a5fca3155e7ab3e840ba9ab5a75b1 diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java index ec7dcca06a..4c3d67bdaa 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java @@ -17,9 +17,12 @@ import com.google.common.cache.RemovalNotification; import java.io.IOException; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.util.Iterator; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.io.FileBackedOutputStream; import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; import org.opendaylight.yangtools.concepts.Identifier; @@ -37,7 +40,7 @@ public class MessageSlicer implements AutoCloseable { private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1); public static final int DEFAULT_MAX_SLICING_TRIES = 3; - private final Cache> stateCache; + private final Cache> stateCache; private final FileBackedOutputStreamFactory fileBackedStreamFactory; private final int messageSliceSize; private final int maxSlicingTries; @@ -92,8 +95,9 @@ public class MessageSlicer implements AutoCloseable { * options. * * @param options the SliceOptions + * @return true if the message was sliced, false otherwise */ - public void slice(final SliceOptions options) { + public boolean slice(final SliceOptions options) { final Identifier identifier = options.getIdentifier(); final Serializable message = options.getMessage(); final FileBackedOutputStream fileBackedStream; @@ -111,16 +115,16 @@ public class MessageSlicer implements AutoCloseable { LOG.debug("{}: Error serializing message for {}", logContext, identifier, e); fileBackedStream.cleanup(); options.getOnFailureCallback().accept(e); - return; + return false; } } else { fileBackedStream = options.getFileBackedStream(); } - initializeSlicing(options, fileBackedStream); + return initializeSlicing(options, fileBackedStream); } - private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) { + private boolean initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) { final Identifier identifier = options.getIdentifier(); MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id); SlicedMessageState state = null; @@ -133,7 +137,7 @@ public class MessageSlicer implements AutoCloseable { LOG.debug("{}: Message does not need to be sliced - sending original message", logContext); state.close(); sendTo(options, message, options.getReplyTo()); - return; + return false; } final MessageSlice firstSlice = getNextSliceMessage(state); @@ -142,6 +146,7 @@ public class MessageSlicer implements AutoCloseable { stateCache.put(messageSliceId, state); sendTo(options, firstSlice, ActorRef.noSender()); + return true; } catch (IOException e) { LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e); if (state != null) { @@ -151,6 +156,7 @@ public class MessageSlicer implements AutoCloseable { } options.getOnFailureCallback().accept(e); + return false; } } @@ -196,6 +202,20 @@ public class MessageSlicer implements AutoCloseable { stateCache.invalidateAll(); } + /** + * Cancels all in-progress sliced message state that matches the given filter. + * + * @param filter filters by Identifier + */ + public void cancelSlicing(@Nonnull final Predicate filter) { + final Iterator iter = stateCache.asMap().keySet().iterator(); + while (iter.hasNext()) { + if (filter.test(iter.next().getClientIdentifier())) { + iter.remove(); + } + } + } + private static MessageSlice getNextSliceMessage(final SlicedMessageState state) throws IOException { final byte[] firstSliceBytes = state.getNextSlice(); return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),