Both Shard and RaftActor (via AbstractLeader) (will) have separate
MessageSlicer instances and we need to determine to which instance
MessageSliceReply messages should be forwarded otherwise the first
MessageSlicer will drop messages destined for the second MessageSlicer.
Therefore add a slicerId field to MessageSliceIdentifier which is
checked by MessageSlicer#handleMessage.
Change-Id: Ib39ede29789d5bfaf1fdaea66a8d2994fe6ebcd6
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
import java.io.ObjectOutput;
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.yangtools.concepts.Identifier;
import java.io.ObjectOutput;
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.concepts.WritableObjects;
/**
* Identifier for a message slice that is composed of a client-supplied Identifier and an internal counter value.
/**
* Identifier for a message slice that is composed of a client-supplied Identifier and an internal counter value.
private static final AtomicLong ID_COUNTER = new AtomicLong(1);
private final Identifier clientIdentifier;
private static final AtomicLong ID_COUNTER = new AtomicLong(1);
private final Identifier clientIdentifier;
+ private final long slicerId;
private final long messageId;
private final long messageId;
- MessageSliceIdentifier(final Identifier clientIdentifier) {
- this(clientIdentifier, ID_COUNTER.getAndIncrement());
+ MessageSliceIdentifier(final Identifier clientIdentifier, final long slicerId) {
+ this(clientIdentifier, slicerId, ID_COUNTER.getAndIncrement());
- private MessageSliceIdentifier(final Identifier clientIdentifier, final long messageId) {
+ private MessageSliceIdentifier(final Identifier clientIdentifier, final long slicerId, final long messageId) {
this.clientIdentifier = Preconditions.checkNotNull(clientIdentifier);
this.messageId = messageId;
this.clientIdentifier = Preconditions.checkNotNull(clientIdentifier);
this.messageId = messageId;
+ this.slicerId = slicerId;
}
Identifier getClientIdentifier() {
return clientIdentifier;
}
}
Identifier getClientIdentifier() {
return clientIdentifier;
}
+ long getSlicerId() {
+ return slicerId;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + clientIdentifier.hashCode();
result = prime * result + (int) (messageId ^ messageId >>> 32);
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + clientIdentifier.hashCode();
result = prime * result + (int) (messageId ^ messageId >>> 32);
+ result = prime * result + (int) (slicerId ^ slicerId >>> 32);
}
MessageSliceIdentifier other = (MessageSliceIdentifier) obj;
}
MessageSliceIdentifier other = (MessageSliceIdentifier) obj;
- return other.clientIdentifier.equals(clientIdentifier) && other.messageId == messageId;
+ return other.clientIdentifier.equals(clientIdentifier) && other.slicerId == slicerId
+ && other.messageId == messageId;
}
@Override
public String toString() {
}
@Override
public String toString() {
- return "MessageSliceIdentifier [clientIdentifier=" + clientIdentifier + ", messageId=" + messageId + "]";
+ return "MessageSliceIdentifier [clientIdentifier=" + clientIdentifier + ", slicerId=" + slicerId
+ + ", messageId=" + messageId + "]";
}
private Object writeReplace() {
}
private Object writeReplace() {
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(messageSliceId.clientIdentifier);
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(messageSliceId.clientIdentifier);
- out.writeLong(messageSliceId.messageId);
+ WritableObjects.writeLongs(out, messageSliceId.slicerId, messageSliceId.messageId);
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- messageSliceId = new MessageSliceIdentifier((Identifier) in.readObject(), in.readLong());
+ final Identifier clientIdentifier = (Identifier) in.readObject();
+ final byte header = WritableObjects.readLongHeader(in);
+ final long slicerId = WritableObjects.readFirstLong(in, header);
+ final long messageId = WritableObjects.readSecondLong(in, header);
+ messageSliceId = new MessageSliceIdentifier(clientIdentifier, slicerId, messageId);
}
private Object readResolve() {
}
private Object readResolve() {
import java.io.Serializable;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.io.Serializable;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
import org.opendaylight.yangtools.concepts.Identifier;
*/
public class MessageSlicer implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
*/
public class MessageSlicer implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
+ private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
public static final int DEFAULT_MAX_SLICING_TRIES = 3;
private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
public static final int DEFAULT_MAX_SLICING_TRIES = 3;
private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
private final int messageSliceSize;
private final int maxSlicingTries;
private final String logContext;
private final int messageSliceSize;
private final int maxSlicingTries;
private final String logContext;
private MessageSlicer(Builder builder) {
this.filedBackedStreamFactory = builder.filedBackedStreamFactory;
this.messageSliceSize = builder.messageSliceSize;
this.maxSlicingTries = builder.maxSlicingTries;
private MessageSlicer(Builder builder) {
this.filedBackedStreamFactory = builder.filedBackedStreamFactory;
this.messageSliceSize = builder.messageSliceSize;
this.maxSlicingTries = builder.maxSlicingTries;
- this.logContext = builder.logContext;
+
+ id = SLICER_ID_COUNTER.getAndIncrement();
+ this.logContext = builder.logContext + "_slicer-id-" + id;
CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
(RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
(RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
stateCache = cacheBuilder.build();
}
stateCache = cacheBuilder.build();
}
+ @VisibleForTesting
+ long getId() {
+ return id;
+ }
+
/**
* Returns a new Builder for creating MessageSlicer instances.
*
/**
* Returns a new Builder for creating MessageSlicer instances.
*
if (message != null) {
LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
if (message != null) {
LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
Preconditions.checkNotNull(filedBackedStreamFactory,
"The FiledBackedStreamFactory must be set in order to call this slice method");
Preconditions.checkNotNull(filedBackedStreamFactory,
"The FiledBackedStreamFactory must be set in order to call this slice method");
private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
final Identifier identifier = options.getIdentifier();
private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
final Identifier identifier = options.getIdentifier();
- MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier);
+ MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
SlicedMessageState<ActorRef> state = null;
try {
state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
SlicedMessageState<ActorRef> state = null;
try {
state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
public boolean handleMessage(final Object message) {
if (message instanceof MessageSliceReply) {
LOG.debug("{}: handleMessage: {}", logContext, message);
public boolean handleMessage(final Object message) {
if (message instanceof MessageSliceReply) {
LOG.debug("{}: handleMessage: {}", logContext, message);
- onMessageSliceReply((MessageSliceReply) message);
- return true;
+ return onMessageSliceReply((MessageSliceReply) message);
state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
}
state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
}
- private void onMessageSliceReply(final MessageSliceReply reply) {
+ private boolean onMessageSliceReply(final MessageSliceReply reply) {
final Identifier identifier = reply.getIdentifier();
final Identifier identifier = reply.getIdentifier();
+ if (!(identifier instanceof MessageSliceIdentifier)
+ || ((MessageSliceIdentifier)identifier).getSlicerId() != id) {
+ return false;
+ }
+
final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
if (state == null) {
LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
if (state == null) {
LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
if (failure.isPresent()) {
LOG.warn("{}: Received failed {}", logContext, reply);
processMessageSliceException(failure.get(), state, reply.getSendTo());
if (failure.isPresent()) {
LOG.warn("{}: Received failed {}", logContext, reply);
processMessageSliceException(failure.get(), state, reply.getSendTo());
}
if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
}
if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
possiblyRetrySlicing(state, reply.getSendTo());
reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
possiblyRetrySlicing(state, reply.getSendTo());
}
if (state.isLastSlice(reply.getSliceIndex())) {
}
if (state.isLastSlice(reply.getSliceIndex())) {
}
private void processMessageSliceException(final MessageSliceException exception,
}
private void processMessageSliceException(final MessageSliceException exception,
final FileBackedOutputStream fileBackStream = spy(new FileBackedOutputStream(100000000, null));
doReturn(fileBackStream).when(mockFiledBackedStreamFactory).newInstance();
final FileBackedOutputStream fileBackStream = spy(new FileBackedOutputStream(100000000, null));
doReturn(fileBackStream).when(mockFiledBackedStreamFactory).newInstance();
- final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+ final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
doThrow(mockFailure).when(mockByteSource).openStream();
doThrow(mockFailure).when(mockByteSource).openBufferedStream();
doThrow(mockFailure).when(mockByteSource).openStream();
doThrow(mockFailure).when(mockByteSource).openBufferedStream();
- final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+ final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt());
doThrow(mockFailure).when(mockFiledBackedStream).flush();
doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt());
doThrow(mockFailure).when(mockFiledBackedStream).flush();
- final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+ final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
final int expiryDuration = 200;
try (MessageAssembler assembler = newMessageAssemblerBuilder("testAssembledMessageStateExpiration")
.expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
final int expiryDuration = 200;
try (MessageAssembler assembler = newMessageAssemblerBuilder("testAssembledMessageStateExpiration")
.expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
- final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+ final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 2,
final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 2,
@Test
public void testFirstMessageSliceWithInvalidIndex() {
try (MessageAssembler assembler = newMessageAssembler("testFirstMessageSliceWithInvalidIndex")) {
@Test
public void testFirstMessageSliceWithInvalidIndex() {
try (MessageAssembler assembler = newMessageAssembler("testFirstMessageSliceWithInvalidIndex")) {
- final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+ final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
final MessageSlice messageSlice = new MessageSlice(identifier, new byte[0], 2, 3, 1, testProbe.ref());
assembler.handleMessage(messageSlice, testProbe.ref());
final MessageSlice messageSlice = new MessageSlice(identifier, new byte[0], 2, 3, 1, testProbe.ref());
assembler.handleMessage(messageSlice, testProbe.ref());
@Test
public void testSerialization() {
@Test
public void testSerialization() {
- MessageSliceIdentifier expected = new MessageSliceIdentifier(new StringIdentifier("test"));
+ MessageSliceIdentifier expected = new MessageSliceIdentifier(new StringIdentifier("test"), 123L);
MessageSliceIdentifier cloned = (MessageSliceIdentifier) SerializationUtils.clone(expected);
assertEquals("cloned", expected, cloned);
MessageSliceIdentifier cloned = (MessageSliceIdentifier) SerializationUtils.clone(expected);
assertEquals("cloned", expected, cloned);
+ assertEquals("getClientIdentifier", expected.getClientIdentifier(), cloned.getClientIdentifier());
+ assertEquals("getSlicerId", expected.getSlicerId(), cloned.getSlicerId());
@Test
public void testHandledMessages() {
@Test
public void testHandledMessages() {
- final MessageSliceReply reply = MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref());
- assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
- assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
-
try (MessageSlicer slicer = newMessageSlicer("testHandledMessages", 100)) {
try (MessageSlicer slicer = newMessageSlicer("testHandledMessages", 100)) {
+ MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
+ final MessageSliceReply reply = MessageSliceReply.success(messageSliceId, 1, testProbe.ref());
+ assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
+ assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
+
assertEquals("handledMessage", Boolean.TRUE, slicer.handleMessage(reply));
assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(new Object()));
assertEquals("handledMessage", Boolean.TRUE, slicer.handleMessage(reply));
assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(new Object()));
+ assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
+ IDENTIFIER, 1,testProbe.ref())));
+ assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
+ new MessageSliceIdentifier(IDENTIFIER, slicer.getId() + 1), 1,testProbe.ref())));
@Test
public void testMessageSliceReplyWithNoState() {
try (MessageSlicer slicer = newMessageSlicer("testMessageSliceReplyWithNoState", 1000)) {
@Test
public void testMessageSliceReplyWithNoState() {
try (MessageSlicer slicer = newMessageSlicer("testMessageSliceReplyWithNoState", 1000)) {
- slicer.handleMessage(MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref()));
+ MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
+ slicer.handleMessage(MessageSliceReply.success(messageSliceId, 1, testProbe.ref()));
final AbortSlicing abortSlicing = testProbe.expectMsgClass(AbortSlicing.class);
final AbortSlicing abortSlicing = testProbe.expectMsgClass(AbortSlicing.class);
- assertEquals("Identifier", IDENTIFIER, abortSlicing.getIdentifier());
+ assertEquals("Identifier", messageSliceId, abortSlicing.getIdentifier());