Bug 7449: Add slicer Id to MessageSliceIdentifier
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / messaging / MessageSlicer.java
index 484a5c2ab9f7d52fe6aa8ea41cff5a81d3d1f040..164737579162a5dfcdec0be0abebfd3a8ee1244e 100644 (file)
@@ -19,6 +19,7 @@ import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
  */
 public class MessageSlicer implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
+    private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
     public static final int DEFAULT_MAX_SLICING_TRIES = 3;
 
     private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
@@ -40,12 +42,15 @@ public class MessageSlicer implements AutoCloseable {
     private final int messageSliceSize;
     private final int maxSlicingTries;
     private final String logContext;
+    private final long id;
 
     private MessageSlicer(Builder builder) {
         this.filedBackedStreamFactory = builder.filedBackedStreamFactory;
         this.messageSliceSize = builder.messageSliceSize;
         this.maxSlicingTries = builder.maxSlicingTries;
-        this.logContext = builder.logContext;
+
+        id = SLICER_ID_COUNTER.getAndIncrement();
+        this.logContext = builder.logContext + "_slicer-id-" + id;
 
         CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
                 (RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
@@ -57,6 +62,11 @@ public class MessageSlicer implements AutoCloseable {
         stateCache = cacheBuilder.build();
     }
 
+    @VisibleForTesting
+    long getId() {
+        return id;
+    }
+
     /**
      * Returns a new Builder for creating MessageSlicer instances.
      *
@@ -90,7 +100,6 @@ public class MessageSlicer implements AutoCloseable {
         if (message != null) {
             LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
 
-
             Preconditions.checkNotNull(filedBackedStreamFactory,
                     "The FiledBackedStreamFactory must be set in order to call this slice method");
 
@@ -113,7 +122,7 @@ public class MessageSlicer implements AutoCloseable {
 
     private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
         final Identifier identifier = options.getIdentifier();
-        MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier);
+        MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
         SlicedMessageState<ActorRef> state = null;
         try {
             state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
@@ -162,8 +171,7 @@ public class MessageSlicer implements AutoCloseable {
     public boolean handleMessage(final Object message) {
         if (message instanceof MessageSliceReply) {
             LOG.debug("{}: handleMessage: {}", logContext, message);
-            onMessageSliceReply((MessageSliceReply) message);
-            return true;
+            return onMessageSliceReply((MessageSliceReply) message);
         }
 
         return false;
@@ -194,13 +202,18 @@ public class MessageSlicer implements AutoCloseable {
                 state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
     }
 
-    private void onMessageSliceReply(final MessageSliceReply reply) {
+    private boolean onMessageSliceReply(final MessageSliceReply reply) {
         final Identifier identifier = reply.getIdentifier();
+        if (!(identifier instanceof MessageSliceIdentifier)
+                || ((MessageSliceIdentifier)identifier).getSlicerId() != id) {
+            return false;
+        }
+
         final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
         if (state == null) {
             LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
             reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
-            return;
+            return true;
         }
 
         synchronized (state) {
@@ -209,7 +222,7 @@ public class MessageSlicer implements AutoCloseable {
                 if (failure.isPresent()) {
                     LOG.warn("{}: Received failed {}", logContext, reply);
                     processMessageSliceException(failure.get(), state, reply.getSendTo());
-                    return;
+                    return true;
                 }
 
                 if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
@@ -217,7 +230,7 @@ public class MessageSlicer implements AutoCloseable {
                             reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
                     reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
                     possiblyRetrySlicing(state, reply.getSendTo());
-                    return;
+                    return true;
                 }
 
                 if (state.isLastSlice(reply.getSliceIndex())) {
@@ -233,6 +246,8 @@ public class MessageSlicer implements AutoCloseable {
                 fail(state, e);
             }
         }
+
+        return true;
     }
 
     private void processMessageSliceException(final MessageSliceException exception,