From 7dab745ec7a2407daf77ddd7000d09b9d8f444b5 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Mon, 3 Jul 2017 13:00:07 -0400 Subject: [PATCH] Bug 7449: Add slicer Id to MessageSliceIdentifier Both Shard and RaftActor (via AbstractLeader) (will) have separate MessageSlicer instances and we need to determine to which instance MessageSliceReply messages should be forwarded otherwise the first MessageSlicer will drop messages destined for the second MessageSlicer. Therefore add a slicerId field to MessageSliceIdentifier which is checked by MessageSlicer#handleMessage. Change-Id: Ib39ede29789d5bfaf1fdaea66a8d2994fe6ebcd6 Signed-off-by: Tom Pantelis --- .../messaging/MessageSliceIdentifier.java | 28 ++++++++++++---- .../cluster/messaging/MessageSlicer.java | 33 ++++++++++++++----- .../messaging/MessageAssemblerTest.java | 10 +++--- .../messaging/MessageSliceIdentifierTest.java | 4 ++- .../cluster/messaging/MessageSlicerTest.java | 18 ++++++---- 5 files changed, 65 insertions(+), 28 deletions(-) diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifier.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifier.java index 2d1147b15f..f81cbb00a1 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifier.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifier.java @@ -14,6 +14,7 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.concepts.WritableObjects; /** * Identifier for a message slice that is composed of a client-supplied Identifier and an internal counter value. @@ -25,27 +26,34 @@ final class MessageSliceIdentifier implements Identifier { private static final AtomicLong ID_COUNTER = new AtomicLong(1); private final Identifier clientIdentifier; + private final long slicerId; private final long messageId; - MessageSliceIdentifier(final Identifier clientIdentifier) { - this(clientIdentifier, ID_COUNTER.getAndIncrement()); + MessageSliceIdentifier(final Identifier clientIdentifier, final long slicerId) { + this(clientIdentifier, slicerId, ID_COUNTER.getAndIncrement()); } - private MessageSliceIdentifier(final Identifier clientIdentifier, final long messageId) { + private MessageSliceIdentifier(final Identifier clientIdentifier, final long slicerId, final long messageId) { this.clientIdentifier = Preconditions.checkNotNull(clientIdentifier); this.messageId = messageId; + this.slicerId = slicerId; } Identifier getClientIdentifier() { return clientIdentifier; } + long getSlicerId() { + return slicerId; + } + @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + clientIdentifier.hashCode(); result = prime * result + (int) (messageId ^ messageId >>> 32); + result = prime * result + (int) (slicerId ^ slicerId >>> 32); return result; } @@ -60,12 +68,14 @@ final class MessageSliceIdentifier implements Identifier { } MessageSliceIdentifier other = (MessageSliceIdentifier) obj; - return other.clientIdentifier.equals(clientIdentifier) && other.messageId == messageId; + return other.clientIdentifier.equals(clientIdentifier) && other.slicerId == slicerId + && other.messageId == messageId; } @Override public String toString() { - return "MessageSliceIdentifier [clientIdentifier=" + clientIdentifier + ", messageId=" + messageId + "]"; + return "MessageSliceIdentifier [clientIdentifier=" + clientIdentifier + ", slicerId=" + slicerId + + ", messageId=" + messageId + "]"; } private Object writeReplace() { @@ -90,12 +100,16 @@ final class MessageSliceIdentifier implements Identifier { @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(messageSliceId.clientIdentifier); - out.writeLong(messageSliceId.messageId); + WritableObjects.writeLongs(out, messageSliceId.slicerId, messageSliceId.messageId); } @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - messageSliceId = new MessageSliceIdentifier((Identifier) in.readObject(), in.readLong()); + final Identifier clientIdentifier = (Identifier) in.readObject(); + final byte header = WritableObjects.readLongHeader(in); + final long slicerId = WritableObjects.readFirstLong(in, header); + final long messageId = WritableObjects.readSecondLong(in, header); + messageSliceId = new MessageSliceIdentifier(clientIdentifier, slicerId, messageId); } private Object readResolve() { diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java index 484a5c2ab9..1647375791 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java @@ -19,6 +19,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.opendaylight.controller.cluster.io.FileBackedOutputStream; import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; import org.opendaylight.yangtools.concepts.Identifier; @@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory; */ public class MessageSlicer implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class); + private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1); public static final int DEFAULT_MAX_SLICING_TRIES = 3; private final Cache> stateCache; @@ -40,12 +42,15 @@ public class MessageSlicer implements AutoCloseable { private final int messageSliceSize; private final int maxSlicingTries; private final String logContext; + private final long id; private MessageSlicer(Builder builder) { this.filedBackedStreamFactory = builder.filedBackedStreamFactory; this.messageSliceSize = builder.messageSliceSize; this.maxSlicingTries = builder.maxSlicingTries; - this.logContext = builder.logContext; + + id = SLICER_ID_COUNTER.getAndIncrement(); + this.logContext = builder.logContext + "_slicer-id-" + id; CacheBuilder> cacheBuilder = CacheBuilder.newBuilder().removalListener( (RemovalListener>) notification -> stateRemoved(notification)); @@ -57,6 +62,11 @@ public class MessageSlicer implements AutoCloseable { stateCache = cacheBuilder.build(); } + @VisibleForTesting + long getId() { + return id; + } + /** * Returns a new Builder for creating MessageSlicer instances. * @@ -90,7 +100,6 @@ public class MessageSlicer implements AutoCloseable { if (message != null) { LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message); - Preconditions.checkNotNull(filedBackedStreamFactory, "The FiledBackedStreamFactory must be set in order to call this slice method"); @@ -113,7 +122,7 @@ public class MessageSlicer implements AutoCloseable { private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) { final Identifier identifier = options.getIdentifier(); - MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier); + MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id); SlicedMessageState state = null; try { state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries, @@ -162,8 +171,7 @@ public class MessageSlicer implements AutoCloseable { public boolean handleMessage(final Object message) { if (message instanceof MessageSliceReply) { LOG.debug("{}: handleMessage: {}", logContext, message); - onMessageSliceReply((MessageSliceReply) message); - return true; + return onMessageSliceReply((MessageSliceReply) message); } return false; @@ -194,13 +202,18 @@ public class MessageSlicer implements AutoCloseable { state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget()); } - private void onMessageSliceReply(final MessageSliceReply reply) { + private boolean onMessageSliceReply(final MessageSliceReply reply) { final Identifier identifier = reply.getIdentifier(); + if (!(identifier instanceof MessageSliceIdentifier) + || ((MessageSliceIdentifier)identifier).getSlicerId() != id) { + return false; + } + final SlicedMessageState state = stateCache.getIfPresent(identifier); if (state == null) { LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply); reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender()); - return; + return true; } synchronized (state) { @@ -209,7 +222,7 @@ public class MessageSlicer implements AutoCloseable { if (failure.isPresent()) { LOG.warn("{}: Received failed {}", logContext, reply); processMessageSliceException(failure.get(), state, reply.getSendTo()); - return; + return true; } if (state.getCurrentSliceIndex() != reply.getSliceIndex()) { @@ -217,7 +230,7 @@ public class MessageSlicer implements AutoCloseable { reply.getSliceIndex(), reply, state.getCurrentSliceIndex()); reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender()); possiblyRetrySlicing(state, reply.getSendTo()); - return; + return true; } if (state.isLastSlice(reply.getSliceIndex())) { @@ -233,6 +246,8 @@ public class MessageSlicer implements AutoCloseable { fail(state, e); } } + + return true; } private void processMessageSliceException(final MessageSliceException exception, diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageAssemblerTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageAssemblerTest.java index e3a68eea26..73773057cf 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageAssemblerTest.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageAssemblerTest.java @@ -72,7 +72,7 @@ public class MessageAssemblerTest extends AbstractMessagingTest { final FileBackedOutputStream fileBackStream = spy(new FileBackedOutputStream(100000000, null)); doReturn(fileBackStream).when(mockFiledBackedStreamFactory).newInstance(); - final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER); + final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1); final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3}); final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1, @@ -96,7 +96,7 @@ public class MessageAssemblerTest extends AbstractMessagingTest { doThrow(mockFailure).when(mockByteSource).openStream(); doThrow(mockFailure).when(mockByteSource).openBufferedStream(); - final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER); + final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1); final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3}); final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1, @@ -121,7 +121,7 @@ public class MessageAssemblerTest extends AbstractMessagingTest { doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt()); doThrow(mockFailure).when(mockFiledBackedStream).flush(); - final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER); + final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1); final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3}); final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1, @@ -142,7 +142,7 @@ public class MessageAssemblerTest extends AbstractMessagingTest { final int expiryDuration = 200; try (MessageAssembler assembler = newMessageAssemblerBuilder("testAssembledMessageStateExpiration") .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) { - final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER); + final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1); final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3}); final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 2, @@ -163,7 +163,7 @@ public class MessageAssemblerTest extends AbstractMessagingTest { @Test public void testFirstMessageSliceWithInvalidIndex() { try (MessageAssembler assembler = newMessageAssembler("testFirstMessageSliceWithInvalidIndex")) { - final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER); + final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1); final MessageSlice messageSlice = new MessageSlice(identifier, new byte[0], 2, 3, 1, testProbe.ref()); assembler.handleMessage(messageSlice, testProbe.ref()); diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifierTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifierTest.java index addfc535ba..9c80033b92 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifierTest.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifierTest.java @@ -21,8 +21,10 @@ public class MessageSliceIdentifierTest { @Test public void testSerialization() { - MessageSliceIdentifier expected = new MessageSliceIdentifier(new StringIdentifier("test")); + MessageSliceIdentifier expected = new MessageSliceIdentifier(new StringIdentifier("test"), 123L); MessageSliceIdentifier cloned = (MessageSliceIdentifier) SerializationUtils.clone(expected); assertEquals("cloned", expected, cloned); + assertEquals("getClientIdentifier", expected.getClientIdentifier(), cloned.getClientIdentifier()); + assertEquals("getSlicerId", expected.getSlicerId(), cloned.getSlicerId()); } } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java index 20b6cab818..df12ad2c11 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java @@ -47,13 +47,18 @@ public class MessageSlicerTest extends AbstractMessagingTest { @Test public void testHandledMessages() { - final MessageSliceReply reply = MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref()); - assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply)); - assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object())); - try (MessageSlicer slicer = newMessageSlicer("testHandledMessages", 100)) { + MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId()); + final MessageSliceReply reply = MessageSliceReply.success(messageSliceId, 1, testProbe.ref()); + assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply)); + assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object())); + assertEquals("handledMessage", Boolean.TRUE, slicer.handleMessage(reply)); assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(new Object())); + assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success( + IDENTIFIER, 1,testProbe.ref()))); + assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success( + new MessageSliceIdentifier(IDENTIFIER, slicer.getId() + 1), 1,testProbe.ref()))); } } @@ -105,9 +110,10 @@ public class MessageSlicerTest extends AbstractMessagingTest { @Test public void testMessageSliceReplyWithNoState() { try (MessageSlicer slicer = newMessageSlicer("testMessageSliceReplyWithNoState", 1000)) { - slicer.handleMessage(MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref())); + MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId()); + slicer.handleMessage(MessageSliceReply.success(messageSliceId, 1, testProbe.ref())); final AbortSlicing abortSlicing = testProbe.expectMsgClass(AbortSlicing.class); - assertEquals("Identifier", IDENTIFIER, abortSlicing.getIdentifier()); + assertEquals("Identifier", messageSliceId, abortSlicing.getIdentifier()); } } -- 2.36.6