Bug 7449: Add slicer Id to MessageSliceIdentifier 97/59897/2
authorTom Pantelis <tompantelis@gmail.com>
Mon, 3 Jul 2017 17:00:07 +0000 (13:00 -0400)
committerRobert Varga <nite@hq.sk>
Mon, 3 Jul 2017 18:45:28 +0000 (18:45 +0000)
Both Shard and RaftActor (via AbstractLeader) (will) have separate
MessageSlicer instances and we need to determine to which instance
MessageSliceReply messages should be forwarded otherwise the first
MessageSlicer will drop messages destined for the second MessageSlicer.
Therefore add a slicerId field to MessageSliceIdentifier which is
checked by MessageSlicer#handleMessage.

Change-Id: Ib39ede29789d5bfaf1fdaea66a8d2994fe6ebcd6
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifier.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/messaging/MessageSlicer.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageAssemblerTest.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSliceIdentifierTest.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/messaging/MessageSlicerTest.java

index 2d1147b15f89561f63fa404cea8cca3d392c930a..f81cbb00a11085b95fd8228d7b09e7c48825f92b 100644 (file)
@@ -14,6 +14,7 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.yangtools.concepts.Identifier;
 import java.io.ObjectOutput;
 import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.concepts.WritableObjects;
 
 /**
  * Identifier for a message slice that is composed of a client-supplied Identifier and an internal counter value.
 
 /**
  * Identifier for a message slice that is composed of a client-supplied Identifier and an internal counter value.
@@ -25,27 +26,34 @@ final class MessageSliceIdentifier implements Identifier {
     private static final AtomicLong ID_COUNTER = new AtomicLong(1);
 
     private final Identifier clientIdentifier;
     private static final AtomicLong ID_COUNTER = new AtomicLong(1);
 
     private final Identifier clientIdentifier;
+    private final long slicerId;
     private final long messageId;
 
     private final long messageId;
 
-    MessageSliceIdentifier(final Identifier clientIdentifier) {
-        this(clientIdentifier, ID_COUNTER.getAndIncrement());
+    MessageSliceIdentifier(final Identifier clientIdentifier, final long slicerId) {
+        this(clientIdentifier, slicerId, ID_COUNTER.getAndIncrement());
     }
 
     }
 
-    private MessageSliceIdentifier(final Identifier clientIdentifier, final long messageId) {
+    private MessageSliceIdentifier(final Identifier clientIdentifier, final long slicerId, final long messageId) {
         this.clientIdentifier = Preconditions.checkNotNull(clientIdentifier);
         this.messageId = messageId;
         this.clientIdentifier = Preconditions.checkNotNull(clientIdentifier);
         this.messageId = messageId;
+        this.slicerId = slicerId;
     }
 
     Identifier getClientIdentifier() {
         return clientIdentifier;
     }
 
     }
 
     Identifier getClientIdentifier() {
         return clientIdentifier;
     }
 
+    long getSlicerId() {
+        return slicerId;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
         int result = 1;
         result = prime * result + clientIdentifier.hashCode();
         result = prime * result + (int) (messageId ^ messageId >>> 32);
     @Override
     public int hashCode() {
         final int prime = 31;
         int result = 1;
         result = prime * result + clientIdentifier.hashCode();
         result = prime * result + (int) (messageId ^ messageId >>> 32);
+        result = prime * result + (int) (slicerId ^ slicerId >>> 32);
         return result;
     }
 
         return result;
     }
 
@@ -60,12 +68,14 @@ final class MessageSliceIdentifier implements Identifier {
         }
 
         MessageSliceIdentifier other = (MessageSliceIdentifier) obj;
         }
 
         MessageSliceIdentifier other = (MessageSliceIdentifier) obj;
-        return other.clientIdentifier.equals(clientIdentifier) && other.messageId == messageId;
+        return other.clientIdentifier.equals(clientIdentifier) && other.slicerId == slicerId
+                && other.messageId == messageId;
     }
 
     @Override
     public String toString() {
     }
 
     @Override
     public String toString() {
-        return "MessageSliceIdentifier [clientIdentifier=" + clientIdentifier + ", messageId=" + messageId + "]";
+        return "MessageSliceIdentifier [clientIdentifier=" + clientIdentifier + ", slicerId=" + slicerId
+                + ", messageId=" + messageId + "]";
     }
 
     private Object writeReplace() {
     }
 
     private Object writeReplace() {
@@ -90,12 +100,16 @@ final class MessageSliceIdentifier implements Identifier {
         @Override
         public void writeExternal(ObjectOutput out) throws IOException {
             out.writeObject(messageSliceId.clientIdentifier);
         @Override
         public void writeExternal(ObjectOutput out) throws IOException {
             out.writeObject(messageSliceId.clientIdentifier);
-            out.writeLong(messageSliceId.messageId);
+            WritableObjects.writeLongs(out, messageSliceId.slicerId, messageSliceId.messageId);
         }
 
         @Override
         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         }
 
         @Override
         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            messageSliceId = new MessageSliceIdentifier((Identifier) in.readObject(), in.readLong());
+            final Identifier clientIdentifier = (Identifier) in.readObject();
+            final byte header = WritableObjects.readLongHeader(in);
+            final long slicerId =  WritableObjects.readFirstLong(in, header);
+            final long messageId = WritableObjects.readSecondLong(in, header);
+            messageSliceId = new MessageSliceIdentifier(clientIdentifier, slicerId, messageId);
         }
 
         private Object readResolve() {
         }
 
         private Object readResolve() {
index 484a5c2ab9f7d52fe6aa8ea41cff5a81d3d1f040..164737579162a5dfcdec0be0abebfd3a8ee1244e 100644 (file)
@@ -19,6 +19,7 @@ import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.io.Serializable;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
 import org.opendaylight.yangtools.concepts.Identifier;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
  */
 public class MessageSlicer implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
  */
 public class MessageSlicer implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
+    private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
     public static final int DEFAULT_MAX_SLICING_TRIES = 3;
 
     private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
     public static final int DEFAULT_MAX_SLICING_TRIES = 3;
 
     private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
@@ -40,12 +42,15 @@ public class MessageSlicer implements AutoCloseable {
     private final int messageSliceSize;
     private final int maxSlicingTries;
     private final String logContext;
     private final int messageSliceSize;
     private final int maxSlicingTries;
     private final String logContext;
+    private final long id;
 
     private MessageSlicer(Builder builder) {
         this.filedBackedStreamFactory = builder.filedBackedStreamFactory;
         this.messageSliceSize = builder.messageSliceSize;
         this.maxSlicingTries = builder.maxSlicingTries;
 
     private MessageSlicer(Builder builder) {
         this.filedBackedStreamFactory = builder.filedBackedStreamFactory;
         this.messageSliceSize = builder.messageSliceSize;
         this.maxSlicingTries = builder.maxSlicingTries;
-        this.logContext = builder.logContext;
+
+        id = SLICER_ID_COUNTER.getAndIncrement();
+        this.logContext = builder.logContext + "_slicer-id-" + id;
 
         CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
                 (RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
 
         CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
                 (RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
@@ -57,6 +62,11 @@ public class MessageSlicer implements AutoCloseable {
         stateCache = cacheBuilder.build();
     }
 
         stateCache = cacheBuilder.build();
     }
 
+    @VisibleForTesting
+    long getId() {
+        return id;
+    }
+
     /**
      * Returns a new Builder for creating MessageSlicer instances.
      *
     /**
      * Returns a new Builder for creating MessageSlicer instances.
      *
@@ -90,7 +100,6 @@ public class MessageSlicer implements AutoCloseable {
         if (message != null) {
             LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
 
         if (message != null) {
             LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
 
-
             Preconditions.checkNotNull(filedBackedStreamFactory,
                     "The FiledBackedStreamFactory must be set in order to call this slice method");
 
             Preconditions.checkNotNull(filedBackedStreamFactory,
                     "The FiledBackedStreamFactory must be set in order to call this slice method");
 
@@ -113,7 +122,7 @@ public class MessageSlicer implements AutoCloseable {
 
     private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
         final Identifier identifier = options.getIdentifier();
 
     private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
         final Identifier identifier = options.getIdentifier();
-        MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier);
+        MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
         SlicedMessageState<ActorRef> state = null;
         try {
             state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
         SlicedMessageState<ActorRef> state = null;
         try {
             state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
@@ -162,8 +171,7 @@ public class MessageSlicer implements AutoCloseable {
     public boolean handleMessage(final Object message) {
         if (message instanceof MessageSliceReply) {
             LOG.debug("{}: handleMessage: {}", logContext, message);
     public boolean handleMessage(final Object message) {
         if (message instanceof MessageSliceReply) {
             LOG.debug("{}: handleMessage: {}", logContext, message);
-            onMessageSliceReply((MessageSliceReply) message);
-            return true;
+            return onMessageSliceReply((MessageSliceReply) message);
         }
 
         return false;
         }
 
         return false;
@@ -194,13 +202,18 @@ public class MessageSlicer implements AutoCloseable {
                 state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
     }
 
                 state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
     }
 
-    private void onMessageSliceReply(final MessageSliceReply reply) {
+    private boolean onMessageSliceReply(final MessageSliceReply reply) {
         final Identifier identifier = reply.getIdentifier();
         final Identifier identifier = reply.getIdentifier();
+        if (!(identifier instanceof MessageSliceIdentifier)
+                || ((MessageSliceIdentifier)identifier).getSlicerId() != id) {
+            return false;
+        }
+
         final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
         if (state == null) {
             LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
             reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
         final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
         if (state == null) {
             LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
             reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
-            return;
+            return true;
         }
 
         synchronized (state) {
         }
 
         synchronized (state) {
@@ -209,7 +222,7 @@ public class MessageSlicer implements AutoCloseable {
                 if (failure.isPresent()) {
                     LOG.warn("{}: Received failed {}", logContext, reply);
                     processMessageSliceException(failure.get(), state, reply.getSendTo());
                 if (failure.isPresent()) {
                     LOG.warn("{}: Received failed {}", logContext, reply);
                     processMessageSliceException(failure.get(), state, reply.getSendTo());
-                    return;
+                    return true;
                 }
 
                 if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
                 }
 
                 if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
@@ -217,7 +230,7 @@ public class MessageSlicer implements AutoCloseable {
                             reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
                     reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
                     possiblyRetrySlicing(state, reply.getSendTo());
                             reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
                     reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
                     possiblyRetrySlicing(state, reply.getSendTo());
-                    return;
+                    return true;
                 }
 
                 if (state.isLastSlice(reply.getSliceIndex())) {
                 }
 
                 if (state.isLastSlice(reply.getSliceIndex())) {
@@ -233,6 +246,8 @@ public class MessageSlicer implements AutoCloseable {
                 fail(state, e);
             }
         }
                 fail(state, e);
             }
         }
+
+        return true;
     }
 
     private void processMessageSliceException(final MessageSliceException exception,
     }
 
     private void processMessageSliceException(final MessageSliceException exception,
index e3a68eea26501d91c8df4a398665870d2a9000d6..73773057cf74311147ad5caafaa4dafd3ca0a8c3 100644 (file)
@@ -72,7 +72,7 @@ public class MessageAssemblerTest extends AbstractMessagingTest {
             final FileBackedOutputStream fileBackStream = spy(new FileBackedOutputStream(100000000, null));
             doReturn(fileBackStream).when(mockFiledBackedStreamFactory).newInstance();
 
             final FileBackedOutputStream fileBackStream = spy(new FileBackedOutputStream(100000000, null));
             doReturn(fileBackStream).when(mockFiledBackedStreamFactory).newInstance();
 
-            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
             final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
 
             final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
             final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
 
             final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
@@ -96,7 +96,7 @@ public class MessageAssemblerTest extends AbstractMessagingTest {
             doThrow(mockFailure).when(mockByteSource).openStream();
             doThrow(mockFailure).when(mockByteSource).openBufferedStream();
 
             doThrow(mockFailure).when(mockByteSource).openStream();
             doThrow(mockFailure).when(mockByteSource).openBufferedStream();
 
-            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
             final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
 
             final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
             final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
 
             final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
@@ -121,7 +121,7 @@ public class MessageAssemblerTest extends AbstractMessagingTest {
             doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt());
             doThrow(mockFailure).when(mockFiledBackedStream).flush();
 
             doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt());
             doThrow(mockFailure).when(mockFiledBackedStream).flush();
 
-            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
             final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
 
             final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
             final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
 
             final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
@@ -142,7 +142,7 @@ public class MessageAssemblerTest extends AbstractMessagingTest {
         final int expiryDuration = 200;
         try (MessageAssembler assembler = newMessageAssemblerBuilder("testAssembledMessageStateExpiration")
                 .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
         final int expiryDuration = 200;
         try (MessageAssembler assembler = newMessageAssemblerBuilder("testAssembledMessageStateExpiration")
                 .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
-            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
             final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
 
             final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 2,
             final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
 
             final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 2,
@@ -163,7 +163,7 @@ public class MessageAssemblerTest extends AbstractMessagingTest {
     @Test
     public void testFirstMessageSliceWithInvalidIndex() {
         try (MessageAssembler assembler = newMessageAssembler("testFirstMessageSliceWithInvalidIndex")) {
     @Test
     public void testFirstMessageSliceWithInvalidIndex() {
         try (MessageAssembler assembler = newMessageAssembler("testFirstMessageSliceWithInvalidIndex")) {
-            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+            final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
             final MessageSlice messageSlice = new MessageSlice(identifier, new byte[0], 2, 3, 1, testProbe.ref());
             assembler.handleMessage(messageSlice, testProbe.ref());
 
             final MessageSlice messageSlice = new MessageSlice(identifier, new byte[0], 2, 3, 1, testProbe.ref());
             assembler.handleMessage(messageSlice, testProbe.ref());
 
index addfc535ba13a2668038d48a64c0f46f27eb473a..9c80033b92014bd08b43a4bbdfd4a321fc5dd780 100644 (file)
@@ -21,8 +21,10 @@ public class MessageSliceIdentifierTest {
 
     @Test
     public void testSerialization() {
 
     @Test
     public void testSerialization() {
-        MessageSliceIdentifier expected = new MessageSliceIdentifier(new StringIdentifier("test"));
+        MessageSliceIdentifier expected = new MessageSliceIdentifier(new StringIdentifier("test"), 123L);
         MessageSliceIdentifier cloned = (MessageSliceIdentifier) SerializationUtils.clone(expected);
         assertEquals("cloned", expected, cloned);
         MessageSliceIdentifier cloned = (MessageSliceIdentifier) SerializationUtils.clone(expected);
         assertEquals("cloned", expected, cloned);
+        assertEquals("getClientIdentifier", expected.getClientIdentifier(), cloned.getClientIdentifier());
+        assertEquals("getSlicerId", expected.getSlicerId(), cloned.getSlicerId());
     }
 }
     }
 }
index 20b6cab818ae641786361cb90874190dde1e8f49..df12ad2c1158c5e5b63b95c758261dbe8321b954 100644 (file)
@@ -47,13 +47,18 @@ public class MessageSlicerTest extends AbstractMessagingTest {
 
     @Test
     public void testHandledMessages() {
 
     @Test
     public void testHandledMessages() {
-        final MessageSliceReply reply = MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref());
-        assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
-        assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
-
         try (MessageSlicer slicer = newMessageSlicer("testHandledMessages", 100)) {
         try (MessageSlicer slicer = newMessageSlicer("testHandledMessages", 100)) {
+            MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
+            final MessageSliceReply reply = MessageSliceReply.success(messageSliceId, 1, testProbe.ref());
+            assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
+            assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
+
             assertEquals("handledMessage", Boolean.TRUE, slicer.handleMessage(reply));
             assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(new Object()));
             assertEquals("handledMessage", Boolean.TRUE, slicer.handleMessage(reply));
             assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(new Object()));
+            assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
+                    IDENTIFIER, 1,testProbe.ref())));
+            assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
+                    new MessageSliceIdentifier(IDENTIFIER, slicer.getId() + 1), 1,testProbe.ref())));
         }
     }
 
         }
     }
 
@@ -105,9 +110,10 @@ public class MessageSlicerTest extends AbstractMessagingTest {
     @Test
     public void testMessageSliceReplyWithNoState() {
         try (MessageSlicer slicer = newMessageSlicer("testMessageSliceReplyWithNoState", 1000)) {
     @Test
     public void testMessageSliceReplyWithNoState() {
         try (MessageSlicer slicer = newMessageSlicer("testMessageSliceReplyWithNoState", 1000)) {
-            slicer.handleMessage(MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref()));
+            MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
+            slicer.handleMessage(MessageSliceReply.success(messageSliceId, 1, testProbe.ref()));
             final AbortSlicing abortSlicing = testProbe.expectMsgClass(AbortSlicing.class);
             final AbortSlicing abortSlicing = testProbe.expectMsgClass(AbortSlicing.class);
-            assertEquals("Identifier", IDENTIFIER, abortSlicing.getIdentifier());
+            assertEquals("Identifier", messageSliceId, abortSlicing.getIdentifier());
         }
     }
 
         }
     }