X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-clustering-commons%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fmessaging%2FMessageSlicer.java;h=57a6f9ed4f44d48ffb2720126b32c21627222b3a;hb=a60f577d66eb510232b0e2ccca73d9e7a81af0c9;hp=484a5c2ab9f7d52fe6aa8ea41cff5a81d3d1f040;hpb=3582bb6dbc506b0c79dd3e4b4f791f4e17cd3103;p=controller.git diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java index 484a5c2ab9..57a6f9ed4f 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java @@ -7,18 +7,22 @@ */ package org.opendaylight.controller.cluster.messaging; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import java.io.IOException; import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.io.FileBackedOutputStream; import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; import org.opendaylight.yangtools.concepts.Identifier; @@ -33,30 +37,38 @@ import org.slf4j.LoggerFactory; */ public class MessageSlicer implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class); + private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1); public static final int DEFAULT_MAX_SLICING_TRIES = 3; - private final Cache> stateCache; - private final FileBackedOutputStreamFactory filedBackedStreamFactory; + private final Cache> stateCache; + private final FileBackedOutputStreamFactory fileBackedStreamFactory; private final int messageSliceSize; private final int maxSlicingTries; private final String logContext; + private final long id; - private MessageSlicer(Builder builder) { - this.filedBackedStreamFactory = builder.filedBackedStreamFactory; + MessageSlicer(final Builder builder) { + this.fileBackedStreamFactory = builder.fileBackedStreamFactory; this.messageSliceSize = builder.messageSliceSize; this.maxSlicingTries = builder.maxSlicingTries; - this.logContext = builder.logContext; - CacheBuilder> cacheBuilder = CacheBuilder.newBuilder().removalListener( - (RemovalListener>) notification -> stateRemoved(notification)); + id = SLICER_ID_COUNTER.getAndIncrement(); + this.logContext = builder.logContext + "_slicer-id-" + id; + + CacheBuilder> cacheBuilder = + CacheBuilder.newBuilder().removalListener(this::stateRemoved); if (builder.expireStateAfterInactivityDuration > 0) { cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration, builder.expireStateAfterInactivityUnit); } - stateCache = cacheBuilder.build(); } + @VisibleForTesting + long getId() { + return id; + } + /** * Returns a new Builder for creating MessageSlicer instances. * @@ -73,7 +85,7 @@ public class MessageSlicer implements AutoCloseable { * @param message the message to check * @return true if handled, false otherwise */ - public static boolean isHandledMessage(Object message) { + public static boolean isHandledMessage(final Object message) { return message instanceof MessageSliceReply; } @@ -82,38 +94,38 @@ public class MessageSlicer implements AutoCloseable { * options. * * @param options the SliceOptions + * @return true if the message was sliced, false otherwise */ - public void slice(SliceOptions options) { + public boolean slice(final SliceOptions options) { final Identifier identifier = options.getIdentifier(); final Serializable message = options.getMessage(); final FileBackedOutputStream fileBackedStream; if (message != null) { LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message); - - Preconditions.checkNotNull(filedBackedStreamFactory, + requireNonNull(fileBackedStreamFactory, "The FiledBackedStreamFactory must be set in order to call this slice method"); // Serialize the message to a FileBackedOutputStream. - fileBackedStream = filedBackedStreamFactory.newInstance(); + fileBackedStream = fileBackedStreamFactory.newInstance(); try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) { out.writeObject(message); } catch (IOException e) { LOG.debug("{}: Error serializing message for {}", logContext, identifier, e); fileBackedStream.cleanup(); options.getOnFailureCallback().accept(e); - return; + return false; } } else { fileBackedStream = options.getFileBackedStream(); } - initializeSlicing(options, fileBackedStream); + return initializeSlicing(options, fileBackedStream); } - private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) { + private boolean initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) { final Identifier identifier = options.getIdentifier(); - MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier); + MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id); SlicedMessageState state = null; try { state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries, @@ -124,7 +136,7 @@ public class MessageSlicer implements AutoCloseable { LOG.debug("{}: Message does not need to be sliced - sending original message", logContext); state.close(); sendTo(options, message, options.getReplyTo()); - return; + return false; } final MessageSlice firstSlice = getNextSliceMessage(state); @@ -133,6 +145,7 @@ public class MessageSlicer implements AutoCloseable { stateCache.put(messageSliceId, state); sendTo(options, firstSlice, ActorRef.noSender()); + return true; } catch (IOException e) { LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e); if (state != null) { @@ -142,10 +155,11 @@ public class MessageSlicer implements AutoCloseable { } options.getOnFailureCallback().accept(e); + return false; } } - private void sendTo(SliceOptions options, Object message, ActorRef sender) { + private static void sendTo(final SliceOptions options, final Object message, final ActorRef sender) { if (options.getSendToRef() != null) { options.getSendToRef().tell(message, sender); } else { @@ -162,8 +176,7 @@ public class MessageSlicer implements AutoCloseable { public boolean handleMessage(final Object message) { if (message instanceof MessageSliceReply) { LOG.debug("{}: handleMessage: {}", logContext, message); - onMessageSliceReply((MessageSliceReply) message); - return true; + return onMessageSliceReply((MessageSliceReply) message); } return false; @@ -188,19 +201,34 @@ public class MessageSlicer implements AutoCloseable { stateCache.invalidateAll(); } - private MessageSlice getNextSliceMessage(SlicedMessageState state) throws IOException { + /** + * Cancels all in-progress sliced message state that matches the given filter. + * + * @param filter filters by Identifier + */ + public void cancelSlicing(final @NonNull Predicate filter) { + stateCache.asMap().keySet().removeIf( + messageSliceIdentifier -> filter.test(messageSliceIdentifier.getClientIdentifier())); + } + + private static MessageSlice getNextSliceMessage(final SlicedMessageState state) throws IOException { final byte[] firstSliceBytes = state.getNextSlice(); return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(), state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget()); } - private void onMessageSliceReply(final MessageSliceReply reply) { + private boolean onMessageSliceReply(final MessageSliceReply reply) { final Identifier identifier = reply.getIdentifier(); + if (!(identifier instanceof MessageSliceIdentifier) + || ((MessageSliceIdentifier)identifier).getSlicerId() != id) { + return false; + } + final SlicedMessageState state = stateCache.getIfPresent(identifier); if (state == null) { LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply); reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender()); - return; + return true; } synchronized (state) { @@ -209,7 +237,7 @@ public class MessageSlicer implements AutoCloseable { if (failure.isPresent()) { LOG.warn("{}: Received failed {}", logContext, reply); processMessageSliceException(failure.get(), state, reply.getSendTo()); - return; + return true; } if (state.getCurrentSliceIndex() != reply.getSliceIndex()) { @@ -217,7 +245,7 @@ public class MessageSlicer implements AutoCloseable { reply.getSliceIndex(), reply, state.getCurrentSliceIndex()); reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender()); possiblyRetrySlicing(state, reply.getSendTo()); - return; + return true; } if (state.isLastSlice(reply.getSliceIndex())) { @@ -233,6 +261,8 @@ public class MessageSlicer implements AutoCloseable { fail(state, e); } } + + return true; } private void processMessageSliceException(final MessageSliceException exception, @@ -263,7 +293,7 @@ public class MessageSlicer implements AutoCloseable { stateCache.invalidate(identifier); } - private void stateRemoved(RemovalNotification> notification) { + private void stateRemoved(final RemovalNotification> notification) { final SlicedMessageState state = notification.getValue(); state.close(); if (notification.wasEvicted()) { @@ -283,14 +313,14 @@ public class MessageSlicer implements AutoCloseable { } @VisibleForTesting - boolean hasState(Identifier forIdentifier) { + boolean hasState(final Identifier forIdentifier) { boolean exists = stateCache.getIfPresent(forIdentifier) != null; stateCache.cleanUp(); return exists; } public static class Builder { - private FileBackedOutputStreamFactory filedBackedStreamFactory; + private FileBackedOutputStreamFactory fileBackedStreamFactory; private int messageSliceSize = -1; private long expireStateAfterInactivityDuration = -1; private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES; @@ -302,11 +332,11 @@ public class MessageSlicer implements AutoCloseable { * is used by the {@link MessageSlicer#slice(SliceOptions)} method if a Serializable message is passed. * If Serializable messages aren't passed then the factory need not be set. * - * @param newFiledBackedStreamFactory the factory for creating FileBackedOutputStream instances + * @param newFileBackedStreamFactory the factory for creating FileBackedOutputStream instances * @return this Builder */ - public Builder filedBackedStreamFactory(final FileBackedOutputStreamFactory newFiledBackedStreamFactory) { - this.filedBackedStreamFactory = Preconditions.checkNotNull(newFiledBackedStreamFactory); + public Builder fileBackedStreamFactory(final FileBackedOutputStreamFactory newFileBackedStreamFactory) { + this.fileBackedStreamFactory = requireNonNull(newFileBackedStreamFactory); return this; } @@ -317,7 +347,7 @@ public class MessageSlicer implements AutoCloseable { * @return this Builder */ public Builder messageSliceSize(final int newMessageSliceSize) { - Preconditions.checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0"); + checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0"); this.messageSliceSize = newMessageSliceSize; return this; } @@ -330,7 +360,7 @@ public class MessageSlicer implements AutoCloseable { * @return this Builder */ public Builder maxSlicingTries(final int newMaxSlicingTries) { - Preconditions.checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0"); + checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0"); this.maxSlicingTries = newMaxSlicingTries; return this; } @@ -345,7 +375,7 @@ public class MessageSlicer implements AutoCloseable { * @return this Builder */ public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) { - Preconditions.checkArgument(duration > 0, "duration must be > 0"); + checkArgument(duration > 0, "duration must be > 0"); this.expireStateAfterInactivityDuration = duration; this.expireStateAfterInactivityUnit = unit; return this; @@ -358,7 +388,7 @@ public class MessageSlicer implements AutoCloseable { * @return this Builder */ public Builder logContext(final String newLogContext) { - this.logContext = Preconditions.checkNotNull(newLogContext); + this.logContext = requireNonNull(newLogContext); return this; }