Slice front-end request messages
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / MessageSlicer.java
index ec7dcca06a3106448c91eb0b7091fc24e28e3aa1..4c3d67bdaaca759d0e81acee561dff09b8bab27b 100644 (file)
@@ -17,9 +17,12 @@ import com.google.common.cache.RemovalNotification;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.Iterator;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -37,7 +40,7 @@ public class MessageSlicer implements AutoCloseable {
     private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
     public static final int DEFAULT_MAX_SLICING_TRIES = 3;
 
     private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
     public static final int DEFAULT_MAX_SLICING_TRIES = 3;
 
-    private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
+    private final Cache<MessageSliceIdentifier, SlicedMessageState<ActorRef>> stateCache;
     private final FileBackedOutputStreamFactory fileBackedStreamFactory;
     private final int messageSliceSize;
     private final int maxSlicingTries;
     private final FileBackedOutputStreamFactory fileBackedStreamFactory;
     private final int messageSliceSize;
     private final int maxSlicingTries;
@@ -92,8 +95,9 @@ public class MessageSlicer implements AutoCloseable {
      * options.
      *
      * @param options the SliceOptions
      * options.
      *
      * @param options the SliceOptions
+     * @return true if the message was sliced, false otherwise
      */
      */
-    public void slice(final SliceOptions options) {
+    public boolean slice(final SliceOptions options) {
         final Identifier identifier = options.getIdentifier();
         final Serializable message = options.getMessage();
         final FileBackedOutputStream fileBackedStream;
         final Identifier identifier = options.getIdentifier();
         final Serializable message = options.getMessage();
         final FileBackedOutputStream fileBackedStream;
@@ -111,16 +115,16 @@ public class MessageSlicer implements AutoCloseable {
                 LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
                 fileBackedStream.cleanup();
                 options.getOnFailureCallback().accept(e);
                 LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
                 fileBackedStream.cleanup();
                 options.getOnFailureCallback().accept(e);
-                return;
+                return false;
             }
         } else {
             fileBackedStream = options.getFileBackedStream();
         }
 
             }
         } else {
             fileBackedStream = options.getFileBackedStream();
         }
 
-        initializeSlicing(options, fileBackedStream);
+        return initializeSlicing(options, fileBackedStream);
     }
 
     }
 
-    private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
+    private boolean initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
         final Identifier identifier = options.getIdentifier();
         MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
         SlicedMessageState<ActorRef> state = null;
         final Identifier identifier = options.getIdentifier();
         MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
         SlicedMessageState<ActorRef> state = null;
@@ -133,7 +137,7 @@ public class MessageSlicer implements AutoCloseable {
                 LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
                 state.close();
                 sendTo(options, message, options.getReplyTo());
                 LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
                 state.close();
                 sendTo(options, message, options.getReplyTo());
-                return;
+                return false;
             }
 
             final MessageSlice firstSlice = getNextSliceMessage(state);
             }
 
             final MessageSlice firstSlice = getNextSliceMessage(state);
@@ -142,6 +146,7 @@ public class MessageSlicer implements AutoCloseable {
 
             stateCache.put(messageSliceId, state);
             sendTo(options, firstSlice, ActorRef.noSender());
 
             stateCache.put(messageSliceId, state);
             sendTo(options, firstSlice, ActorRef.noSender());
+            return true;
         } catch (IOException e) {
             LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
             if (state != null) {
         } catch (IOException e) {
             LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
             if (state != null) {
@@ -151,6 +156,7 @@ public class MessageSlicer implements AutoCloseable {
             }
 
             options.getOnFailureCallback().accept(e);
             }
 
             options.getOnFailureCallback().accept(e);
+            return false;
         }
     }
 
         }
     }
 
@@ -196,6 +202,20 @@ public class MessageSlicer implements AutoCloseable {
         stateCache.invalidateAll();
     }
 
         stateCache.invalidateAll();
     }
 
+    /**
+     * Cancels all in-progress sliced message state that matches the given filter.
+     *
+     * @param filter filters by Identifier
+     */
+    public void cancelSlicing(@Nonnull final Predicate<Identifier> filter) {
+        final Iterator<MessageSliceIdentifier> iter = stateCache.asMap().keySet().iterator();
+        while (iter.hasNext()) {
+            if (filter.test(iter.next().getClientIdentifier())) {
+                iter.remove();
+            }
+        }
+    }
+
     private static MessageSlice getNextSliceMessage(final SlicedMessageState<ActorRef> state) throws IOException {
         final byte[] firstSliceBytes = state.getNextSlice();
         return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
     private static MessageSlice getNextSliceMessage(final SlicedMessageState<ActorRef> state) throws IOException {
         final byte[] firstSliceBytes = state.getNextSlice();
         return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),