import java.io.ObjectOutput;
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.concepts.WritableObjects;
/**
* Identifier for a message slice that is composed of a client-supplied Identifier and an internal counter value.
private static final AtomicLong ID_COUNTER = new AtomicLong(1);
private final Identifier clientIdentifier;
+ private final long slicerId;
private final long messageId;
- MessageSliceIdentifier(final Identifier clientIdentifier) {
- this(clientIdentifier, ID_COUNTER.getAndIncrement());
+ MessageSliceIdentifier(final Identifier clientIdentifier, final long slicerId) {
+ this(clientIdentifier, slicerId, ID_COUNTER.getAndIncrement());
}
- private MessageSliceIdentifier(final Identifier clientIdentifier, final long messageId) {
+ private MessageSliceIdentifier(final Identifier clientIdentifier, final long slicerId, final long messageId) {
this.clientIdentifier = Preconditions.checkNotNull(clientIdentifier);
this.messageId = messageId;
+ this.slicerId = slicerId;
}
Identifier getClientIdentifier() {
return clientIdentifier;
}
+ long getSlicerId() {
+ return slicerId;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + clientIdentifier.hashCode();
result = prime * result + (int) (messageId ^ messageId >>> 32);
+ result = prime * result + (int) (slicerId ^ slicerId >>> 32);
return result;
}
}
MessageSliceIdentifier other = (MessageSliceIdentifier) obj;
- return other.clientIdentifier.equals(clientIdentifier) && other.messageId == messageId;
+ return other.clientIdentifier.equals(clientIdentifier) && other.slicerId == slicerId
+ && other.messageId == messageId;
}
@Override
public String toString() {
- return "MessageSliceIdentifier [clientIdentifier=" + clientIdentifier + ", messageId=" + messageId + "]";
+ return "MessageSliceIdentifier [clientIdentifier=" + clientIdentifier + ", slicerId=" + slicerId
+ + ", messageId=" + messageId + "]";
}
private Object writeReplace() {
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(messageSliceId.clientIdentifier);
- out.writeLong(messageSliceId.messageId);
+ WritableObjects.writeLongs(out, messageSliceId.slicerId, messageSliceId.messageId);
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- messageSliceId = new MessageSliceIdentifier((Identifier) in.readObject(), in.readLong());
+ final Identifier clientIdentifier = (Identifier) in.readObject();
+ final byte header = WritableObjects.readLongHeader(in);
+ final long slicerId = WritableObjects.readFirstLong(in, header);
+ final long messageId = WritableObjects.readSecondLong(in, header);
+ messageSliceId = new MessageSliceIdentifier(clientIdentifier, slicerId, messageId);
}
private Object readResolve() {
import java.io.Serializable;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
import org.opendaylight.yangtools.concepts.Identifier;
*/
public class MessageSlicer implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
+ private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
public static final int DEFAULT_MAX_SLICING_TRIES = 3;
private final Cache<Identifier, SlicedMessageState<ActorRef>> stateCache;
private final int messageSliceSize;
private final int maxSlicingTries;
private final String logContext;
+ private final long id;
private MessageSlicer(Builder builder) {
this.filedBackedStreamFactory = builder.filedBackedStreamFactory;
this.messageSliceSize = builder.messageSliceSize;
this.maxSlicingTries = builder.maxSlicingTries;
- this.logContext = builder.logContext;
+
+ id = SLICER_ID_COUNTER.getAndIncrement();
+ this.logContext = builder.logContext + "_slicer-id-" + id;
CacheBuilder<Identifier, SlicedMessageState<ActorRef>> cacheBuilder = CacheBuilder.newBuilder().removalListener(
(RemovalListener<Identifier, SlicedMessageState<ActorRef>>) notification -> stateRemoved(notification));
stateCache = cacheBuilder.build();
}
+ @VisibleForTesting
+ long getId() {
+ return id;
+ }
+
/**
* Returns a new Builder for creating MessageSlicer instances.
*
if (message != null) {
LOG.debug("{}: slice: identifier: {}, message: {}", logContext, identifier, message);
-
Preconditions.checkNotNull(filedBackedStreamFactory,
"The FiledBackedStreamFactory must be set in order to call this slice method");
private void initializeSlicing(final SliceOptions options, final FileBackedOutputStream fileBackedStream) {
final Identifier identifier = options.getIdentifier();
- MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier);
+ MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(identifier, id);
SlicedMessageState<ActorRef> state = null;
try {
state = new SlicedMessageState<>(messageSliceId, fileBackedStream, messageSliceSize, maxSlicingTries,
public boolean handleMessage(final Object message) {
if (message instanceof MessageSliceReply) {
LOG.debug("{}: handleMessage: {}", logContext, message);
- onMessageSliceReply((MessageSliceReply) message);
- return true;
+ return onMessageSliceReply((MessageSliceReply) message);
}
return false;
state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
}
- private void onMessageSliceReply(final MessageSliceReply reply) {
+ private boolean onMessageSliceReply(final MessageSliceReply reply) {
final Identifier identifier = reply.getIdentifier();
+ if (!(identifier instanceof MessageSliceIdentifier)
+ || ((MessageSliceIdentifier)identifier).getSlicerId() != id) {
+ return false;
+ }
+
final SlicedMessageState<ActorRef> state = stateCache.getIfPresent(identifier);
if (state == null) {
LOG.warn("{}: SlicedMessageState not found for {}", logContext, reply);
reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
- return;
+ return true;
}
synchronized (state) {
if (failure.isPresent()) {
LOG.warn("{}: Received failed {}", logContext, reply);
processMessageSliceException(failure.get(), state, reply.getSendTo());
- return;
+ return true;
}
if (state.getCurrentSliceIndex() != reply.getSliceIndex()) {
reply.getSliceIndex(), reply, state.getCurrentSliceIndex());
reply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
possiblyRetrySlicing(state, reply.getSendTo());
- return;
+ return true;
}
if (state.isLastSlice(reply.getSliceIndex())) {
fail(state, e);
}
}
+
+ return true;
}
private void processMessageSliceException(final MessageSliceException exception,
final FileBackedOutputStream fileBackStream = spy(new FileBackedOutputStream(100000000, null));
doReturn(fileBackStream).when(mockFiledBackedStreamFactory).newInstance();
- final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+ final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
doThrow(mockFailure).when(mockByteSource).openStream();
doThrow(mockFailure).when(mockByteSource).openBufferedStream();
- final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+ final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt());
doThrow(mockFailure).when(mockFiledBackedStream).flush();
- final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+ final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
final int expiryDuration = 200;
try (MessageAssembler assembler = newMessageAssemblerBuilder("testAssembledMessageStateExpiration")
.expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
- final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+ final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 2,
@Test
public void testFirstMessageSliceWithInvalidIndex() {
try (MessageAssembler assembler = newMessageAssembler("testFirstMessageSliceWithInvalidIndex")) {
- final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
+ final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
final MessageSlice messageSlice = new MessageSlice(identifier, new byte[0], 2, 3, 1, testProbe.ref());
assembler.handleMessage(messageSlice, testProbe.ref());
@Test
public void testSerialization() {
- MessageSliceIdentifier expected = new MessageSliceIdentifier(new StringIdentifier("test"));
+ MessageSliceIdentifier expected = new MessageSliceIdentifier(new StringIdentifier("test"), 123L);
MessageSliceIdentifier cloned = (MessageSliceIdentifier) SerializationUtils.clone(expected);
assertEquals("cloned", expected, cloned);
+ assertEquals("getClientIdentifier", expected.getClientIdentifier(), cloned.getClientIdentifier());
+ assertEquals("getSlicerId", expected.getSlicerId(), cloned.getSlicerId());
}
}
@Test
public void testHandledMessages() {
- final MessageSliceReply reply = MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref());
- assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
- assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
-
try (MessageSlicer slicer = newMessageSlicer("testHandledMessages", 100)) {
+ MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
+ final MessageSliceReply reply = MessageSliceReply.success(messageSliceId, 1, testProbe.ref());
+ assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
+ assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
+
assertEquals("handledMessage", Boolean.TRUE, slicer.handleMessage(reply));
assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(new Object()));
+ assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
+ IDENTIFIER, 1,testProbe.ref())));
+ assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
+ new MessageSliceIdentifier(IDENTIFIER, slicer.getId() + 1), 1,testProbe.ref())));
}
}
@Test
public void testMessageSliceReplyWithNoState() {
try (MessageSlicer slicer = newMessageSlicer("testMessageSliceReplyWithNoState", 1000)) {
- slicer.handleMessage(MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref()));
+ MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
+ slicer.handleMessage(MessageSliceReply.success(messageSliceId, 1, testProbe.ref()));
final AbortSlicing abortSlicing = testProbe.expectMsgClass(AbortSlicing.class);
- assertEquals("Identifier", IDENTIFIER, abortSlicing.getIdentifier());
+ assertEquals("Identifier", messageSliceId, abortSlicing.getIdentifier());
}
}