import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
import org.opendaylight.yangtools.concepts.Identifier;
private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
public static final int DEFAULT_MAX_SLICING_TRIES = 3;
- private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
+ private final Cache<MessageSliceIdentifier, SlicedMessageState<ActorRef>> stateCache;
private final FileBackedOutputStreamFactory fileBackedStreamFactory;
private final int messageSliceSize;
private final int maxSlicingTries;
* options.
*
* @param options the SliceOptions
+ * @return true if the message was sliced, false otherwise
*/
- public void slice(final SliceOptions options) {
+ public boolean slice(final SliceOptions options) {
final Identifier identifier = options.getIdentifier();
final Serializable message = options.getMessage();
final FileBackedOutputStream fileBackedStream;
LOG.debug("{}: Error serializing message for {}", logContext, identifier, e);
fileBackedStream.cleanup();
options.getOnFailureCallback().accept(e);
- return;
+ return false;
}
} else {
fileBackedStream = options.getFileBackedStream();
}
- initializeSlicing(options, fileBackedStream);
+ return initializeSlicing(options, fileBackedStream);
}
- private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
+ private boolean initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
final Identifier identifier = options.getIdentifier();
MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
SlicedMessageState<ActorRef> state = null;
LOG.debug("{}: Message does not need to be sliced - sending original message", logContext);
state.close();
sendTo(options, message, options.getReplyTo());
- return;
+ return false;
}
final MessageSlice firstSlice = getNextSliceMessage(state);
stateCache.put(messageSliceId, state);
sendTo(options, firstSlice, ActorRef.noSender());
+ return true;
} catch (IOException e) {
LOG.error("{}: Error initializing SlicedMessageState for {}", logContext, identifier, e);
if (state != null) {
}
options.getOnFailureCallback().accept(e);
+ return false;
}
}
stateCache.invalidateAll();
}
+ /**
+ * Cancels all in-progress sliced message state that matches the given filter.
+ *
+ * @param filter filters by Identifier
+ */
+ public void cancelSlicing(@Nonnull final Predicate<Identifier> filter) {
+ final Iterator<MessageSliceIdentifier> iter = stateCache.asMap().keySet().iterator();
+ while (iter.hasNext()) {
+ if (filter.test(iter.next().getClientIdentifier())) {
+ iter.remove();
+ }
+ }
+ }
+
private static MessageSlice getNextSliceMessage(final SlicedMessageState<ActorRef> state) throws IOException {
final byte[] firstSliceBytes = state.getNextSlice();
return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),