Bug 7449: Add slicer Id to MessageSliceIdentifier 97/59897/2
authorTom Pantelis <tompantelis@gmail.com>
Mon, 3 Jul 2017 17:00:07 +0000 (13:00 -0400)
committerRobert Varga <nite@hq.sk>
Mon, 3 Jul 2017 18:45:28 +0000 (18:45 +0000)
Both Shard and RaftActor (via AbstractLeader) (will) have separate
MessageSlicer instances and we need to determine to which instance
MessageSliceReply messages should be forwarded otherwise the first
MessageSlicer will drop messages destined for the second MessageSlicer.
Therefore add a slicerId field to MessageSliceIdentifier which is
checked by MessageSlicer#handleMessage.

Change-Id: Ib39ede29789d5bfaf1fdaea66a8d2994fe6ebcd6
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifier.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageAssemblerTest.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifierTest.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java

index 2d1147b..f81cbb0 100644 (file)
@@ -14,6 +14,7 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.concepts.WritableObjects;
 
 /**
  * Identifier for a message slice that is composed of a client-supplied Identifier and an internal counter value.
@@ -25,27 +26,34 @@ final class MessageSliceIdentifier implements Identifier {
     private static final AtomicLong ID_COUNTER = new AtomicLong(1);
 
     private final Identifier clientIdentifier;
+    private final long slicerId;
     private final long messageId;
 
-    MessageSliceIdentifier(final Identifier clientIdentifier) {
-        this(clientIdentifier, ID_COUNTER.getAndIncrement());
+    MessageSliceIdentifier(final Identifier clientIdentifier, final long slicerId) {
+        this(clientIdentifier, slicerId, ID_COUNTER.getAndIncrement());
     }
 
-    private MessageSliceIdentifier(final Identifier clientIdentifier, final long messageId) {
+    private MessageSliceIdentifier(final Identifier clientIdentifier, final long slicerId, final long messageId) {
         this.clientIdentifier = Preconditions.checkNotNull(clientIdentifier);
         this.messageId = messageId;
+        this.slicerId = slicerId;
     }
 
     Identifier getClientIdentifier() {
         return clientIdentifier;
     }
 
+    long getSlicerId() {
+        return slicerId;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
         int result = 1;
         result = prime * result + clientIdentifier.hashCode();
         result = prime * result + (int) (messageId ^ messageId >>> 32);
+        result = prime * result + (int) (slicerId ^ slicerId >>> 32);
         return result;
     }
 
@@ -60,12 +68,14 @@ final class MessageSliceIdentifier implements Identifier {
         }
 
         MessageSliceIdentifier other = (MessageSliceIdentifier) obj;
-        return other.clientIdentifier.equals(clientIdentifier) && other.messageId == messageId;
+        return other.clientIdentifier.equals(clientIdentifier) && other.slicerId == slicerId
+                && other.messageId == messageId;
     }
 
     @Override
     public String toString() {
-        return "MessageSliceIdentifier [clientIdentifier=" + clientIdentifier + ", messageId=" + messageId + "]";
+        return "MessageSliceIdentifier [clientIdentifier=" + clientIdentifier + ", slicerId=" + slicerId
+                + ", messageId=" + messageId + "]";
     }
 
     private Object writeReplace() {
@@ -90,12 +100,16 @@ final class MessageSliceIdentifier implements Identifier {
         @Override
         public void writeExternal(ObjectOutput out) throws IOException {
             out.writeObject(messageSliceId.clientIdentifier);
-            out.writeLong(messageSliceId.messageId);
+            WritableObjects.writeLongs(out, messageSliceId.slicerId, messageSliceId.messageId);
         }
 
         @Override
         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            messageSliceId = new MessageSliceIdentifier((Identifier) in.readObject(), in.readLong());
+            final Identifier clientIdentifier = (Identifier) in.readObject();
+            final byte header = WritableObjects.readLongHeader(in);
+            final long slicerId =  WritableObjects.readFirstLong(in, header);
+            final long messageId = WritableObjects.readSecondLong(in, header);
+            messageSliceId = new MessageSliceIdentifier(clientIdentifier, slicerId, messageId);
         }
 
         private Object readResolve() {
index 484a5c2..1647375 100644 (file)
@@ -19,6 +19,7 @@ import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
  */
 public class MessageSlicer implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
+    private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
     public static final int DEFAULT_MAX_SLICING_TRIES = 3;
 
     private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
@@ -40,12 +42,15 @@ public class MessageSlicer implements AutoCloseable {
     private final int messageSliceSize;
     private final int maxSlicingTries;
     private final String logContext;
+    private final long id;
 
     private MessageSlicer(Builder builder) {
         this.filedBackedStreamFactory = builder.filedBackedStreamFactory;
         this.messageSliceSize = builder.messageSliceSize;
         this.maxSlicingTries = builder.maxSlicingTries;
-        this.logContext = builder.logContext;
+
+        id = SLICER_ID_COUNTER.getAndIncrement();
+        this.logContext = builder.logContext + "_slicer-id-" + id;
 
         CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
                 (RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
@@ -57,6 +62,11 @@ public class MessageSlicer implements AutoCloseable {
         stateCache = cacheBuilder.build();
     }
 
+    @VisibleForTesting
+    long getId() {
+        return id;
+    }
+
     /**
      * Returns a new Builder for creating MessageSlicer instances.
      *
@@ -90,7 +100,6 @@ public class MessageSlicer implements AutoCloseable {
         if (message != null) {
             LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
 
-
             Preconditions.checkNotNull(filedBackedStreamFactory,
                     "The FiledBackedStreamFactory must be set in order to call this slice method");
 
@@ -113,7 +122,7 @@ public class MessageSlicer implements AutoCloseable {
 
     private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
         final Identifier identifier = options.getIdentifier();
-        MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier);
+        MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
         SlicedMessageState<ActorRef> state = null;
         try {
             state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
@@ -162,8 +171,7 @@ public class MessageSlicer implements AutoCloseable {
     public boolean handleMessage(final Object message) {
         if (message instanceof MessageSliceReply) {
             LOG.debug("{}: handleMessage: {}", logContext, message);
-            onMessageSliceReply((MessageSliceReply) message);
-            return true;
+            return onMessageSliceReply((MessageSliceReply) message);
         }
 
         return false;
@@ -194,13 +202,18 @@ public class MessageSlicer implements AutoCloseable {
                 state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
     }
 
-    private void onMessageSliceReply(final MessageSliceReply reply) {
+    private boolean onMessageSliceReply(final MessageSliceReply reply) {
         final Identifier identifier = reply.getIdentifier();
+        if (!(identifier instanceof MessageSliceIdentifier)
+                || ((MessageSliceIdentifier)identifier).getSlicerId() != id) {
+            return false;
+        }
+
         final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
         if (state == null) {
             LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
             reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
-            return;
+            return true;
         }
 
         synchronized (state) {
@@ -209,7 +222,7 @@ public class MessageSlicer implements AutoCloseable {
                 if (failure.isPresent()) {
                     LOG.warn("{}: Received failed {}", logContext, reply);
                     processMessageSliceException(failure.get(), state, reply.getSendTo());
-                    return;
+                    return true;
                 }
 
                 if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
@@ -217,7 +230,7 @@ public class MessageSlicer implements AutoCloseable {
                             reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
                     reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
                     possiblyRetrySlicing(state, reply.getSendTo());
-                    return;
+                    return true;
                 }
 
                 if (state.isLastSlice(reply.getSliceIndex())) {
@@ -233,6 +246,8 @@ public class MessageSlicer implements AutoCloseable {
                 fail(state, e);
             }
         }
+
+        return true;
     }
 
     private void processMessageSliceException(final MessageSliceException exception,
index e3a68ee..7377305 100644 (file)
@@ -72,7 +72,7 @@ public class MessageAssemblerTest extends AbstractMessagingTest {
             final FileBackedOutputStream fileBackStream = spy(new FileBackedOutputStream(100000000, null));
             doReturn(fileBackStream).when(mockFiledBackedStreamFactory).newInstance();
 
-            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
             final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
 
             final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
@@ -96,7 +96,7 @@ public class MessageAssemblerTest extends AbstractMessagingTest {
             doThrow(mockFailure).when(mockByteSource).openStream();
             doThrow(mockFailure).when(mockByteSource).openBufferedStream();
 
-            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
             final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
 
             final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
@@ -121,7 +121,7 @@ public class MessageAssemblerTest extends AbstractMessagingTest {
             doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt());
             doThrow(mockFailure).when(mockFiledBackedStream).flush();
 
-            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
             final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
 
             final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
@@ -142,7 +142,7 @@ public class MessageAssemblerTest extends AbstractMessagingTest {
         final int expiryDuration = 200;
         try (MessageAssembler assembler = newMessageAssemblerBuilder("testAssembledMessageStateExpiration")
                 .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
-            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
             final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
 
             final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 2,
@@ -163,7 +163,7 @@ public class MessageAssemblerTest extends AbstractMessagingTest {
     @Test
     public void testFirstMessageSliceWithInvalidIndex() {
         try (MessageAssembler assembler = newMessageAssembler("testFirstMessageSliceWithInvalidIndex")) {
-            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
             final MessageSlice messageSlice = new MessageSlice(identifier, new byte[0], 2, 3, 1, testProbe.ref());
             assembler.handleMessage(messageSlice, testProbe.ref());
 
index addfc53..9c80033 100644 (file)
@@ -21,8 +21,10 @@ public class MessageSliceIdentifierTest {
 
     @Test
     public void testSerialization() {
-        MessageSliceIdentifier expected = new MessageSliceIdentifier(new StringIdentifier("test"));
+        MessageSliceIdentifier expected = new MessageSliceIdentifier(new StringIdentifier("test"), 123L);
         MessageSliceIdentifier cloned = (MessageSliceIdentifier) SerializationUtils.clone(expected);
         assertEquals("cloned", expected, cloned);
+        assertEquals("getClientIdentifier", expected.getClientIdentifier(), cloned.getClientIdentifier());
+        assertEquals("getSlicerId", expected.getSlicerId(), cloned.getSlicerId());
     }
 }
index 20b6cab..df12ad2 100644 (file)
@@ -47,13 +47,18 @@ public class MessageSlicerTest extends AbstractMessagingTest {
 
     @Test
     public void testHandledMessages() {
-        final MessageSliceReply reply = MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref());
-        assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
-        assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
-
         try (MessageSlicer slicer = newMessageSlicer("testHandledMessages", 100)) {
+            MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
+            final MessageSliceReply reply = MessageSliceReply.success(messageSliceId, 1, testProbe.ref());
+            assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
+            assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
+
             assertEquals("handledMessage", Boolean.TRUE, slicer.handleMessage(reply));
             assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(new Object()));
+            assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
+                    IDENTIFIER, 1,testProbe.ref())));
+            assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
+                    new MessageSliceIdentifier(IDENTIFIER, slicer.getId() + 1), 1,testProbe.ref())));
         }
     }
 
@@ -105,9 +110,10 @@ public class MessageSlicerTest extends AbstractMessagingTest {
     @Test
     public void testMessageSliceReplyWithNoState() {
         try (MessageSlicer slicer = newMessageSlicer("testMessageSliceReplyWithNoState", 1000)) {
-            slicer.handleMessage(MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref()));
+            MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
+            slicer.handleMessage(MessageSliceReply.success(messageSliceId, 1, testProbe.ref()));
             final AbortSlicing abortSlicing = testProbe.expectMsgClass(AbortSlicing.class);
-            assertEquals("Identifier", IDENTIFIER, abortSlicing.getIdentifier());
+            assertEquals("Identifier", messageSliceId, abortSlicing.getIdentifier());
         }
     }