X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-clustering-commons%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fmessaging%2FMessageSlicer.java;h=57a6f9ed4f44d48ffb2720126b32c21627222b3a;hp=1454e72cc063d2323ca4844cb9e8dcb5ae7e3d9f;hb=b4bf55727093657662d8c16a50fa85f87978a586;hpb=da0c512a2fc26de5a161b13eb91650af23450829 diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java index 1454e72cc0..57a6f9ed4f 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java @@ -7,12 +7,13 @@ */ package org.opendaylight.controller.cluster.messaging; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import java.io.IOException; import java.io.ObjectOutputStream; @@ -20,6 +21,8 @@ import java.io.Serializable; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.io.FileBackedOutputStream; import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; import org.opendaylight.yangtools.concepts.Identifier; @@ -37,14 +40,14 @@ public class MessageSlicer implements AutoCloseable { private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1); public static final int DEFAULT_MAX_SLICING_TRIES = 3; - private final Cache> stateCache; + private final Cache> stateCache; private final FileBackedOutputStreamFactory fileBackedStreamFactory; private final int messageSliceSize; private final int maxSlicingTries; private final String logContext; private final long id; - private MessageSlicer(Builder builder) { + MessageSlicer(final Builder builder) { this.fileBackedStreamFactory = builder.fileBackedStreamFactory; this.messageSliceSize = builder.messageSliceSize; this.maxSlicingTries = builder.maxSlicingTries; @@ -52,13 +55,12 @@ public class MessageSlicer implements AutoCloseable { id = SLICER_ID_COUNTER.getAndIncrement(); this.logContext = builder.logContext + "_slicer-id-" + id; - CacheBuilder> cacheBuilder = CacheBuilder.newBuilder().removalListener( - (RemovalListener>) notification -> stateRemoved(notification)); + CacheBuilder> cacheBuilder = + CacheBuilder.newBuilder().removalListener(this::stateRemoved); if (builder.expireStateAfterInactivityDuration > 0) { cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration, builder.expireStateAfterInactivityUnit); } - stateCache = cacheBuilder.build(); } @@ -83,7 +85,7 @@ public class MessageSlicer implements AutoCloseable { * @param message the message to check * @return true if handled, false otherwise */ - public static boolean isHandledMessage(Object message) { + public static boolean isHandledMessage(final Object message) { return message instanceof MessageSliceReply; } @@ -92,15 +94,16 @@ public class MessageSlicer implements AutoCloseable { * options. * * @param options the SliceOptions + * @return true if the message was sliced, false otherwise */ - public void slice(SliceOptions options) { + public boolean slice(final SliceOptions options) { final Identifier identifier = options.getIdentifier(); final Serializable message = options.getMessage(); final FileBackedOutputStream fileBackedStream; if (message != null) { LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message); - Preconditions.checkNotNull(fileBackedStreamFactory, + requireNonNull(fileBackedStreamFactory, "The FiledBackedStreamFactory must be set in order to call this slice method"); // Serialize the message to a FileBackedOutputStream. @@ -111,16 +114,16 @@ public class MessageSlicer implements AutoCloseable { LOG.debug("{}: Error serializing message for {}", logContext, identifier, e); fileBackedStream.cleanup(); options.getOnFailureCallback().accept(e); - return; + return false; } } else { fileBackedStream = options.getFileBackedStream(); } - initializeSlicing(options, fileBackedStream); + return initializeSlicing(options, fileBackedStream); } - private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) { + private boolean initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) { final Identifier identifier = options.getIdentifier(); MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id); SlicedMessageState state = null; @@ -133,7 +136,7 @@ public class MessageSlicer implements AutoCloseable { LOG.debug("{}: Message does not need to be sliced - sending original message", logContext); state.close(); sendTo(options, message, options.getReplyTo()); - return; + return false; } final MessageSlice firstSlice = getNextSliceMessage(state); @@ -142,6 +145,7 @@ public class MessageSlicer implements AutoCloseable { stateCache.put(messageSliceId, state); sendTo(options, firstSlice, ActorRef.noSender()); + return true; } catch (IOException e) { LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e); if (state != null) { @@ -151,10 +155,11 @@ public class MessageSlicer implements AutoCloseable { } options.getOnFailureCallback().accept(e); + return false; } } - private void sendTo(SliceOptions options, Object message, ActorRef sender) { + private static void sendTo(final SliceOptions options, final Object message, final ActorRef sender) { if (options.getSendToRef() != null) { options.getSendToRef().tell(message, sender); } else { @@ -196,7 +201,17 @@ public class MessageSlicer implements AutoCloseable { stateCache.invalidateAll(); } - private MessageSlice getNextSliceMessage(SlicedMessageState state) throws IOException { + /** + * Cancels all in-progress sliced message state that matches the given filter. + * + * @param filter filters by Identifier + */ + public void cancelSlicing(final @NonNull Predicate filter) { + stateCache.asMap().keySet().removeIf( + messageSliceIdentifier -> filter.test(messageSliceIdentifier.getClientIdentifier())); + } + + private static MessageSlice getNextSliceMessage(final SlicedMessageState state) throws IOException { final byte[] firstSliceBytes = state.getNextSlice(); return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(), state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget()); @@ -278,7 +293,7 @@ public class MessageSlicer implements AutoCloseable { stateCache.invalidate(identifier); } - private void stateRemoved(RemovalNotification> notification) { + private void stateRemoved(final RemovalNotification> notification) { final SlicedMessageState state = notification.getValue(); state.close(); if (notification.wasEvicted()) { @@ -298,7 +313,7 @@ public class MessageSlicer implements AutoCloseable { } @VisibleForTesting - boolean hasState(Identifier forIdentifier) { + boolean hasState(final Identifier forIdentifier) { boolean exists = stateCache.getIfPresent(forIdentifier) != null; stateCache.cleanUp(); return exists; @@ -321,7 +336,7 @@ public class MessageSlicer implements AutoCloseable { * @return this Builder */ public Builder fileBackedStreamFactory(final FileBackedOutputStreamFactory newFileBackedStreamFactory) { - this.fileBackedStreamFactory = Preconditions.checkNotNull(newFileBackedStreamFactory); + this.fileBackedStreamFactory = requireNonNull(newFileBackedStreamFactory); return this; } @@ -332,7 +347,7 @@ public class MessageSlicer implements AutoCloseable { * @return this Builder */ public Builder messageSliceSize(final int newMessageSliceSize) { - Preconditions.checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0"); + checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0"); this.messageSliceSize = newMessageSliceSize; return this; } @@ -345,7 +360,7 @@ public class MessageSlicer implements AutoCloseable { * @return this Builder */ public Builder maxSlicingTries(final int newMaxSlicingTries) { - Preconditions.checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0"); + checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0"); this.maxSlicingTries = newMaxSlicingTries; return this; } @@ -360,7 +375,7 @@ public class MessageSlicer implements AutoCloseable { * @return this Builder */ public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) { - Preconditions.checkArgument(duration > 0, "duration must be > 0"); + checkArgument(duration > 0, "duration must be > 0"); this.expireStateAfterInactivityDuration = duration; this.expireStateAfterInactivityUnit = unit; return this; @@ -373,7 +388,7 @@ public class MessageSlicer implements AutoCloseable { * @return this Builder */ public Builder logContext(final String newLogContext) { - this.logContext = Preconditions.checkNotNull(newLogContext); + this.logContext = requireNonNull(newLogContext); return this; }