import java.io.Serializable;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
import org.opendaylight.yangtools.concepts.Identifier;
*/
public class MessageSlicer implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
+ private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
public static final int DEFAULT_MAX_SLICING_TRIES = 3;
private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
private final int messageSliceSize;
private final int maxSlicingTries;
private final String logContext;
+ private final long id;
private MessageSlicer(Builder builder) {
this.filedBackedStreamFactory = builder.filedBackedStreamFactory;
this.messageSliceSize = builder.messageSliceSize;
this.maxSlicingTries = builder.maxSlicingTries;
- this.logContext = builder.logContext;
+
+ id = SLICER_ID_COUNTER.getAndIncrement();
+ this.logContext = builder.logContext + "_slicer-id-" + id;
CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
(RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
stateCache = cacheBuilder.build();
}
+ @VisibleForTesting
+ long getId() {
+ return id;
+ }
+
/**
* Returns a new Builder for creating MessageSlicer instances.
*
if (message != null) {
LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
-
Preconditions.checkNotNull(filedBackedStreamFactory,
"The FiledBackedStreamFactory must be set in order to call this slice method");
private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
final Identifier identifier = options.getIdentifier();
- MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier);
+ MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
SlicedMessageState<ActorRef> state = null;
try {
state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
public boolean handleMessage(final Object message) {
if (message instanceof MessageSliceReply) {
LOG.debug("{}: handleMessage: {}", logContext, message);
- onMessageSliceReply((MessageSliceReply) message);
- return true;
+ return onMessageSliceReply((MessageSliceReply) message);
}
return false;
state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
}
- private void onMessageSliceReply(final MessageSliceReply reply) {
+ private boolean onMessageSliceReply(final MessageSliceReply reply) {
final Identifier identifier = reply.getIdentifier();
+ if (!(identifier instanceof MessageSliceIdentifier)
+ || ((MessageSliceIdentifier)identifier).getSlicerId() != id) {
+ return false;
+ }
+
final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
if (state == null) {
LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
- return;
+ return true;
}
synchronized (state) {
if (failure.isPresent()) {
LOG.warn("{}: Received failed {}", logContext, reply);
processMessageSliceException(failure.get(), state, reply.getSendTo());
- return;
+ return true;
}
if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
possiblyRetrySlicing(state, reply.getSendTo());
- return;
+ return true;
}
if (state.isLastSlice(reply.getSliceIndex())) {
fail(state, e);
}
}
+
+ return true;
}
private void processMessageSliceException(final MessageSliceException exception,