Rename ValueTypes to LithiumValue
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / MessageSlicer.java
index 484a5c2ab9f7d52fe6aa8ea41cff5a81d3d1f040..57a6f9ed4f44d48ffb2720126b32c21627222b3a 100644 (file)
@@ -7,18 +7,22 @@
  */
 package org.opendaylight.controller.cluster.messaging;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorRef;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -33,30 +37,38 @@ import org.slf4j.LoggerFactory;
  */
 public class MessageSlicer implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
+    private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
     public static final int DEFAULT_MAX_SLICING_TRIES = 3;
 
-    private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
-    private final FileBackedOutputStreamFactory filedBackedStreamFactory;
+    private final Cache<MessageSliceIdentifier, SlicedMessageState<ActorRef>> stateCache;
+    private final FileBackedOutputStreamFactory fileBackedStreamFactory;
     private final int messageSliceSize;
     private final int maxSlicingTries;
     private final String logContext;
+    private final long id;
 
-    private MessageSlicer(Builder builder) {
-        this.filedBackedStreamFactory = builder.filedBackedStreamFactory;
+    MessageSlicer(final Builder builder) {
+        this.fileBackedStreamFactory = builder.fileBackedStreamFactory;
         this.messageSliceSize = builder.messageSliceSize;
         this.maxSlicingTries = builder.maxSlicingTries;
-        this.logContext = builder.logContext;
 
-        CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
-                (RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
+        id = SLICER_ID_COUNTER.getAndIncrement();
+        this.logContext = builder.logContext + "_slicer-id-" + id;
+
+        CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder =
+                CacheBuilder.newBuilder().removalListener(this::stateRemoved);
         if (builder.expireStateAfterInactivityDuration > 0) {
             cacheBuilder = cacheBuilder.expireAfterAccess(builder.expireStateAfterInactivityDuration,
                     builder.expireStateAfterInactivityUnit);
         }
-
         stateCache = cacheBuilder.build();
     }
 
+    @VisibleForTesting
+    long getId() {
+        return id;
+    }
+
     /**
      * Returns a new Builder for creating MessageSlicer instances.
      *
@@ -73,7 +85,7 @@ public class MessageSlicer implements AutoCloseable {
      * @param message the message to check
      * @return true if handled, false otherwise
      */
-    public static boolean isHandledMessage(Object message) {
+    public static boolean isHandledMessage(final Object message) {
         return message instanceof MessageSliceReply;
     }
 
@@ -82,38 +94,38 @@ public class MessageSlicer implements AutoCloseable {
      * options.
      *
      * @param options the SliceOptions
+     * @return true if the message was sliced, false otherwise
      */
-    public void slice(SliceOptions options) {
+    public boolean slice(final SliceOptions options) {
         final Identifier identifier = options.getIdentifier();
         final Serializable message = options.getMessage();
         final FileBackedOutputStream fileBackedStream;
         if (message != null) {
             LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
 
-
-            Preconditions.checkNotNull(filedBackedStreamFactory,
+            requireNonNull(fileBackedStreamFactory,
                     "The FiledBackedStreamFactory must be set in order to call this slice method");
 
             // Serialize the message to a FileBackedOutputStream.
-            fileBackedStream = filedBackedStreamFactory.newInstance();
+            fileBackedStream = fileBackedStreamFactory.newInstance();
             try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
                 out.writeObject(message);
             } catch (IOException e) {
                 LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
                 fileBackedStream.cleanup();
                 options.getOnFailureCallback().accept(e);
-                return;
+                return false;
             }
         } else {
             fileBackedStream = options.getFileBackedStream();
         }
 
-        initializeSlicing(options, fileBackedStream);
+        return initializeSlicing(options, fileBackedStream);
     }
 
-    private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
+    private boolean initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
         final Identifier identifier = options.getIdentifier();
-        MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier);
+        MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
         SlicedMessageState<ActorRef> state = null;
         try {
             state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
@@ -124,7 +136,7 @@ public class MessageSlicer implements AutoCloseable {
                 LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
                 state.close();
                 sendTo(options, message, options.getReplyTo());
-                return;
+                return false;
             }
 
             final MessageSlice firstSlice = getNextSliceMessage(state);
@@ -133,6 +145,7 @@ public class MessageSlicer implements AutoCloseable {
 
             stateCache.put(messageSliceId, state);
             sendTo(options, firstSlice, ActorRef.noSender());
+            return true;
         } catch (IOException e) {
             LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
             if (state != null) {
@@ -142,10 +155,11 @@ public class MessageSlicer implements AutoCloseable {
             }
 
             options.getOnFailureCallback().accept(e);
+            return false;
         }
     }
 
-    private void sendTo(SliceOptions options, Object message, ActorRef sender) {
+    private static void sendTo(final SliceOptions options, final Object message, final ActorRef sender) {
         if (options.getSendToRef() != null) {
             options.getSendToRef().tell(message, sender);
         } else {
@@ -162,8 +176,7 @@ public class MessageSlicer implements AutoCloseable {
     public boolean handleMessage(final Object message) {
         if (message instanceof MessageSliceReply) {
             LOG.debug("{}: handleMessage: {}", logContext, message);
-            onMessageSliceReply((MessageSliceReply) message);
-            return true;
+            return onMessageSliceReply((MessageSliceReply) message);
         }
 
         return false;
@@ -188,19 +201,34 @@ public class MessageSlicer implements AutoCloseable {
         stateCache.invalidateAll();
     }
 
-    private MessageSlice getNextSliceMessage(SlicedMessageState<ActorRef> state) throws IOException {
+    /**
+     * Cancels all in-progress sliced message state that matches the given filter.
+     *
+     * @param filter filters by Identifier
+     */
+    public void cancelSlicing(final @NonNull Predicate<Identifier> filter) {
+        stateCache.asMap().keySet().removeIf(
+            messageSliceIdentifier -> filter.test(messageSliceIdentifier.getClientIdentifier()));
+    }
+
+    private static MessageSlice getNextSliceMessage(final SlicedMessageState<ActorRef> state) throws IOException {
         final byte[] firstSliceBytes = state.getNextSlice();
         return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
                 state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
     }
 
-    private void onMessageSliceReply(final MessageSliceReply reply) {
+    private boolean onMessageSliceReply(final MessageSliceReply reply) {
         final Identifier identifier = reply.getIdentifier();
+        if (!(identifier instanceof MessageSliceIdentifier)
+                || ((MessageSliceIdentifier)identifier).getSlicerId() != id) {
+            return false;
+        }
+
         final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
         if (state == null) {
             LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
             reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
-            return;
+            return true;
         }
 
         synchronized (state) {
@@ -209,7 +237,7 @@ public class MessageSlicer implements AutoCloseable {
                 if (failure.isPresent()) {
                     LOG.warn("{}: Received failed {}", logContext, reply);
                     processMessageSliceException(failure.get(), state, reply.getSendTo());
-                    return;
+                    return true;
                 }
 
                 if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
@@ -217,7 +245,7 @@ public class MessageSlicer implements AutoCloseable {
                             reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
                     reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
                     possiblyRetrySlicing(state, reply.getSendTo());
-                    return;
+                    return true;
                 }
 
                 if (state.isLastSlice(reply.getSliceIndex())) {
@@ -233,6 +261,8 @@ public class MessageSlicer implements AutoCloseable {
                 fail(state, e);
             }
         }
+
+        return true;
     }
 
     private void processMessageSliceException(final MessageSliceException exception,
@@ -263,7 +293,7 @@ public class MessageSlicer implements AutoCloseable {
         stateCache.invalidate(identifier);
     }
 
-    private void stateRemoved(RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
+    private void stateRemoved(final RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
         final SlicedMessageState<ActorRef> state = notification.getValue();
         state.close();
         if (notification.wasEvicted()) {
@@ -283,14 +313,14 @@ public class MessageSlicer implements AutoCloseable {
     }
 
     @VisibleForTesting
-    boolean hasState(Identifier forIdentifier) {
+    boolean hasState(final Identifier forIdentifier) {
         boolean exists = stateCache.getIfPresent(forIdentifier) != null;
         stateCache.cleanUp();
         return exists;
     }
 
     public static class Builder {
-        private FileBackedOutputStreamFactory filedBackedStreamFactory;
+        private FileBackedOutputStreamFactory fileBackedStreamFactory;
         private int messageSliceSize = -1;
         private long expireStateAfterInactivityDuration = -1;
         private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
@@ -302,11 +332,11 @@ public class MessageSlicer implements AutoCloseable {
          * is used by the {@link MessageSlicer#slice(SliceOptions)} method if a Serializable message is passed.
          * If Serializable messages aren't passed then the factory need not be set.
          *
-         * @param newFiledBackedStreamFactory the factory for creating FileBackedOutputStream instances
+         * @param newFileBackedStreamFactory the factory for creating FileBackedOutputStream instances
          * @return this Builder
          */
-        public Builder filedBackedStreamFactory(final FileBackedOutputStreamFactory newFiledBackedStreamFactory) {
-            this.filedBackedStreamFactory = Preconditions.checkNotNull(newFiledBackedStreamFactory);
+        public Builder fileBackedStreamFactory(final FileBackedOutputStreamFactory newFileBackedStreamFactory) {
+            this.fileBackedStreamFactory = requireNonNull(newFileBackedStreamFactory);
             return this;
         }
 
@@ -317,7 +347,7 @@ public class MessageSlicer implements AutoCloseable {
          * @return this Builder
          */
         public Builder messageSliceSize(final int newMessageSliceSize) {
-            Preconditions.checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0");
+            checkArgument(newMessageSliceSize > 0, "messageSliceSize must be > 0");
             this.messageSliceSize = newMessageSliceSize;
             return this;
         }
@@ -330,7 +360,7 @@ public class MessageSlicer implements AutoCloseable {
          * @return this Builder
          */
         public Builder maxSlicingTries(final int newMaxSlicingTries) {
-            Preconditions.checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0");
+            checkArgument(newMaxSlicingTries > 0, "newMaxSlicingTries must be > 0");
             this.maxSlicingTries = newMaxSlicingTries;
             return this;
         }
@@ -345,7 +375,7 @@ public class MessageSlicer implements AutoCloseable {
          * @return this Builder
          */
         public Builder expireStateAfterInactivity(final long duration, final TimeUnit unit) {
-            Preconditions.checkArgument(duration > 0, "duration must be > 0");
+            checkArgument(duration > 0, "duration must be > 0");
             this.expireStateAfterInactivityDuration = duration;
             this.expireStateAfterInactivityUnit = unit;
             return this;
@@ -358,7 +388,7 @@ public class MessageSlicer implements AutoCloseable {
          * @return this Builder
          */
         public Builder logContext(final String newLogContext) {
-            this.logContext = Preconditions.checkNotNull(newLogContext);
+            this.logContext = requireNonNull(newLogContext);
             return this;
         }