package org.opendaylight.controller.cluster.messaging;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
import org.opendaylight.yangtools.concepts.Identifier;
/**
@Test
public void testHandledMessages() {
- final MessageSliceReply reply = MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref());
- assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
- assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
-
try (MessageSlicer slicer = newMessageSlicer("testHandledMessages", 100)) {
+ MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
+ final MessageSliceReply reply = MessageSliceReply.success(messageSliceId, 1, testProbe.ref());
+ assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
+ assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
+
assertEquals("handledMessage", Boolean.TRUE, slicer.handleMessage(reply));
assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(new Object()));
+ assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
+ IDENTIFIER, 1,testProbe.ref())));
+ assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
+ new MessageSliceIdentifier(IDENTIFIER, slicer.getId() + 1), 1,testProbe.ref())));
}
}
doThrow(mockFailure).when(mockFiledBackedStream).flush();
try (MessageSlicer slicer = newMessageSlicer("testSliceWithFailedSerialization", 100)) {
- slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
- mockOnFailureCallback);
+ final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
+ testProbe.ref(), mockOnFailureCallback);
+ assertFalse(wasSliced);
assertFailureCallback(IOException.class);
verify(mockFiledBackedStream).cleanup();
doThrow(mockFailure).when(mockByteSource).openBufferedStream();
try (MessageSlicer slicer = newMessageSlicer("testSliceWithByteSourceFailure", 100)) {
- slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
- mockOnFailureCallback);
+ final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
+ testProbe.ref(), mockOnFailureCallback);
+ assertFalse(wasSliced);
assertFailureCallback(IOException.class);
verify(mockFiledBackedStream).cleanup();
doReturn(0).when(mockInputStream).read(any(byte[].class));
try (MessageSlicer slicer = newMessageSlicer("testSliceWithInputStreamFailure", 2)) {
- slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
- mockOnFailureCallback);
+ final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
+ testProbe.ref(), mockOnFailureCallback);
+ assertFalse(wasSliced);
assertFailureCallback(IOException.class);
verify(mockFiledBackedStream).cleanup();
@Test
public void testMessageSliceReplyWithNoState() {
try (MessageSlicer slicer = newMessageSlicer("testMessageSliceReplyWithNoState", 1000)) {
- slicer.handleMessage(MessageSliceReply.success(IDENTIFIER, 1, testProbe.ref()));
+ MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
+ slicer.handleMessage(MessageSliceReply.success(messageSliceId, 1, testProbe.ref()));
final AbortSlicing abortSlicing = testProbe.expectMsgClass(AbortSlicing.class);
- assertEquals("Identifier", IDENTIFIER, abortSlicing.getIdentifier());
+ assertEquals("Identifier", messageSliceId, abortSlicing.getIdentifier());
}
}
verifyNoMoreInteractions(mockOnFailureCallback);
}
+ @Test
+ public void testCancelSlicing() throws IOException {
+ doReturn(1).when(mockInputStream).read(any(byte[].class));
+
+ final MessageSlicer slicer = newMessageSlicer("testCloseAllSlicedMessageState", 1);
+ slicer.slice(SliceOptions.builder().identifier(IDENTIFIER).fileBackedOutputStream(mockFiledBackedStream)
+ .sendTo(testProbe.ref()).replyTo(testProbe.ref()).onFailureCallback(mockOnFailureCallback).build());
+
+ final FileBackedOutputStream mockFiledBackedStream2 = mock(FileBackedOutputStream.class);
+ setupMockFiledBackedStream(mockFiledBackedStream2);
+ slicer.slice(SliceOptions.builder().identifier(new StringIdentifier("test2"))
+ .fileBackedOutputStream(mockFiledBackedStream2).sendTo(testProbe.ref()).replyTo(testProbe.ref())
+ .onFailureCallback(mockOnFailureCallback).build());
+
+ slicer.cancelSlicing(id -> id.equals(IDENTIFIER));
+
+ verify(mockFiledBackedStream).cleanup();
+ verify(mockFiledBackedStream2, never()).cleanup();
+ verifyNoMoreInteractions(mockOnFailureCallback);
+ }
+
@Test
public void testCheckExpiredSlicedMessageState() throws IOException {
doReturn(1).when(mockInputStream).read(any(byte[].class));
final int expiryDuration = 200;
try (MessageSlicer slicer = MessageSlicer.builder().messageSliceSize(1)
.logContext("testCheckExpiredSlicedMessageState")
- .filedBackedStreamFactory(mockFiledBackedStreamFactory)
+ .fileBackedStreamFactory(mockFiledBackedStreamFactory)
.expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(),
mockOnFailureCallback);
private MessageSlicer newMessageSlicer(String logContext, final int messageSliceSize) {
return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext)
- .filedBackedStreamFactory(mockFiledBackedStreamFactory).build();
+ .fileBackedStreamFactory(mockFiledBackedStreamFactory).build();
}
- static void slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo,
+ static boolean slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo,
ActorRef replyTo, Consumer<Throwable> onFailureCallback) {
- slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo).replyTo(replyTo)
- .onFailureCallback(onFailureCallback).build());
+ return slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo)
+ .replyTo(replyTo).onFailureCallback(onFailureCallback).build());
}
}