2 * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.messaging;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Matchers.anyInt;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.doThrow;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
22 import akka.actor.ActorRef;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import java.io.IOException;
25 import java.io.Serializable;
26 import java.util.concurrent.TimeUnit;
27 import java.util.function.Consumer;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.mockito.ArgumentCaptor;
31 import org.mockito.Mock;
32 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
33 import org.opendaylight.yangtools.concepts.Identifier;
36 * Unit tests for MessageSlicer.
38 * @author Thomas Pantelis
40 public class MessageSlicerTest extends AbstractMessagingTest {
42 private Consumer<Throwable> mockOnFailureCallback;
46 public void setup() throws IOException {
49 doNothing().when(mockOnFailureCallback).accept(any(Throwable.class));
53 public void testHandledMessages() {
54 try (MessageSlicer slicer = newMessageSlicer("testHandledMessages", 100)) {
55 MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
56 final MessageSliceReply reply = MessageSliceReply.success(messageSliceId, 1, testProbe.ref());
57 assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
58 assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
60 assertEquals("handledMessage", Boolean.TRUE, slicer.handleMessage(reply));
61 assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(new Object()));
62 assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
63 IDENTIFIER, 1,testProbe.ref())));
64 assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
65 new MessageSliceIdentifier(IDENTIFIER, slicer.getId() + 1), 1,testProbe.ref())));
70 public void testSliceWithFailedSerialization() throws IOException {
71 IOException mockFailure = new IOException("mock IOException");
72 doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt());
73 doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class));
74 doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt());
75 doThrow(mockFailure).when(mockFiledBackedStream).flush();
77 try (MessageSlicer slicer = newMessageSlicer("testSliceWithFailedSerialization", 100)) {
78 final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
79 testProbe.ref(), mockOnFailureCallback);
80 assertFalse(wasSliced);
82 assertFailureCallback(IOException.class);
83 verify(mockFiledBackedStream).cleanup();
88 public void testSliceWithByteSourceFailure() throws IOException {
89 IOException mockFailure = new IOException("mock IOException");
90 doThrow(mockFailure).when(mockByteSource).openStream();
91 doThrow(mockFailure).when(mockByteSource).openBufferedStream();
93 try (MessageSlicer slicer = newMessageSlicer("testSliceWithByteSourceFailure", 100)) {
94 final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
95 testProbe.ref(), mockOnFailureCallback);
96 assertFalse(wasSliced);
98 assertFailureCallback(IOException.class);
99 verify(mockFiledBackedStream).cleanup();
104 public void testSliceWithInputStreamFailure() throws IOException {
105 doReturn(0).when(mockInputStream).read(any(byte[].class));
107 try (MessageSlicer slicer = newMessageSlicer("testSliceWithInputStreamFailure", 2)) {
108 final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
109 testProbe.ref(), mockOnFailureCallback);
110 assertFalse(wasSliced);
112 assertFailureCallback(IOException.class);
113 verify(mockFiledBackedStream).cleanup();
118 public void testMessageSliceReplyWithNoState() {
119 try (MessageSlicer slicer = newMessageSlicer("testMessageSliceReplyWithNoState", 1000)) {
120 MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
121 slicer.handleMessage(MessageSliceReply.success(messageSliceId, 1, testProbe.ref()));
122 final AbortSlicing abortSlicing = testProbe.expectMsgClass(AbortSlicing.class);
123 assertEquals("Identifier", messageSliceId, abortSlicing.getIdentifier());
128 public void testCloseAllSlicedMessageState() throws IOException {
129 doReturn(1).when(mockInputStream).read(any(byte[].class));
131 final MessageSlicer slicer = newMessageSlicer("testCloseAllSlicedMessageState", 1);
132 slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(),
133 mockOnFailureCallback);
137 verify(mockFiledBackedStream).cleanup();
138 verifyNoMoreInteractions(mockOnFailureCallback);
142 public void testCancelSlicing() throws IOException {
143 doReturn(1).when(mockInputStream).read(any(byte[].class));
145 final MessageSlicer slicer = newMessageSlicer("testCloseAllSlicedMessageState", 1);
146 slicer.slice(SliceOptions.builder().identifier(IDENTIFIER).fileBackedOutputStream(mockFiledBackedStream)
147 .sendTo(testProbe.ref()).replyTo(testProbe.ref()).onFailureCallback(mockOnFailureCallback).build());
149 final FileBackedOutputStream mockFiledBackedStream2 = mock(FileBackedOutputStream.class);
150 setupMockFiledBackedStream(mockFiledBackedStream2);
151 slicer.slice(SliceOptions.builder().identifier(new StringIdentifier("test2"))
152 .fileBackedOutputStream(mockFiledBackedStream2).sendTo(testProbe.ref()).replyTo(testProbe.ref())
153 .onFailureCallback(mockOnFailureCallback).build());
155 slicer.cancelSlicing(id -> id.equals(IDENTIFIER));
157 verify(mockFiledBackedStream).cleanup();
158 verify(mockFiledBackedStream2, never()).cleanup();
159 verifyNoMoreInteractions(mockOnFailureCallback);
163 public void testCheckExpiredSlicedMessageState() throws IOException {
164 doReturn(1).when(mockInputStream).read(any(byte[].class));
166 final int expiryDuration = 200;
167 try (MessageSlicer slicer = MessageSlicer.builder().messageSliceSize(1)
168 .logContext("testCheckExpiredSlicedMessageState")
169 .fileBackedStreamFactory(mockFiledBackedStreamFactory)
170 .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
171 slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(),
172 mockOnFailureCallback);
174 Uninterruptibles.sleepUninterruptibly(expiryDuration + 50, TimeUnit.MILLISECONDS);
175 slicer.checkExpiredSlicedMessageState();
177 assertFailureCallback(RuntimeException.class);
178 verify(mockFiledBackedStream).cleanup();
182 private void assertFailureCallback(final Class<?> exceptionType) {
183 ArgumentCaptor<Throwable> exceptionCaptor = ArgumentCaptor.forClass(Throwable.class);
184 verify(mockOnFailureCallback).accept(exceptionCaptor.capture());
185 assertEquals("Exception type", exceptionType, exceptionCaptor.getValue().getClass());
188 private MessageSlicer newMessageSlicer(String logContext, final int messageSliceSize) {
189 return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext)
190 .fileBackedStreamFactory(mockFiledBackedStreamFactory).build();
193 static boolean slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo,
194 ActorRef replyTo, Consumer<Throwable> onFailureCallback) {
195 return slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo)
196 .replyTo(replyTo).onFailureCallback(onFailureCallback).build());