2 * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.messaging;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Matchers.any;
12 import static org.mockito.Matchers.anyInt;
13 import static org.mockito.Mockito.doNothing;
14 import static org.mockito.Mockito.doReturn;
15 import static org.mockito.Mockito.doThrow;
16 import static org.mockito.Mockito.verify;
17 import static org.mockito.Mockito.verifyNoMoreInteractions;
19 import akka.actor.ActorRef;
20 import com.google.common.util.concurrent.Uninterruptibles;
21 import java.io.IOException;
22 import java.io.Serializable;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Consumer;
25 import org.junit.Before;
26 import org.junit.Test;
27 import org.mockito.ArgumentCaptor;
28 import org.mockito.Mock;
29 import org.opendaylight.yangtools.concepts.Identifier;
32 * Unit tests for MessageSlicer.
34 * @author Thomas Pantelis
36 public class MessageSlicerTest extends AbstractMessagingTest {
38 private Consumer<Throwable> mockOnFailureCallback;
42 public void setup() throws IOException {
45 doNothing().when(mockOnFailureCallback).accept(any(Throwable.class));
49 public void testHandledMessages() {
50 try (MessageSlicer slicer = newMessageSlicer("testHandledMessages", 100)) {
51 MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
52 final MessageSliceReply reply = MessageSliceReply.success(messageSliceId, 1, testProbe.ref());
53 assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
54 assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
56 assertEquals("handledMessage", Boolean.TRUE, slicer.handleMessage(reply));
57 assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(new Object()));
58 assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
59 IDENTIFIER, 1,testProbe.ref())));
60 assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
61 new MessageSliceIdentifier(IDENTIFIER, slicer.getId() + 1), 1,testProbe.ref())));
66 public void testSliceWithFailedSerialization() throws IOException {
67 IOException mockFailure = new IOException("mock IOException");
68 doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt());
69 doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class));
70 doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt());
71 doThrow(mockFailure).when(mockFiledBackedStream).flush();
73 try (MessageSlicer slicer = newMessageSlicer("testSliceWithFailedSerialization", 100)) {
74 slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
75 mockOnFailureCallback);
77 assertFailureCallback(IOException.class);
78 verify(mockFiledBackedStream).cleanup();
83 public void testSliceWithByteSourceFailure() throws IOException {
84 IOException mockFailure = new IOException("mock IOException");
85 doThrow(mockFailure).when(mockByteSource).openStream();
86 doThrow(mockFailure).when(mockByteSource).openBufferedStream();
88 try (MessageSlicer slicer = newMessageSlicer("testSliceWithByteSourceFailure", 100)) {
89 slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
90 mockOnFailureCallback);
92 assertFailureCallback(IOException.class);
93 verify(mockFiledBackedStream).cleanup();
98 public void testSliceWithInputStreamFailure() throws IOException {
99 doReturn(0).when(mockInputStream).read(any(byte[].class));
101 try (MessageSlicer slicer = newMessageSlicer("testSliceWithInputStreamFailure", 2)) {
102 slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
103 mockOnFailureCallback);
105 assertFailureCallback(IOException.class);
106 verify(mockFiledBackedStream).cleanup();
111 public void testMessageSliceReplyWithNoState() {
112 try (MessageSlicer slicer = newMessageSlicer("testMessageSliceReplyWithNoState", 1000)) {
113 MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
114 slicer.handleMessage(MessageSliceReply.success(messageSliceId, 1, testProbe.ref()));
115 final AbortSlicing abortSlicing = testProbe.expectMsgClass(AbortSlicing.class);
116 assertEquals("Identifier", messageSliceId, abortSlicing.getIdentifier());
121 public void testCloseAllSlicedMessageState() throws IOException {
122 doReturn(1).when(mockInputStream).read(any(byte[].class));
124 final MessageSlicer slicer = newMessageSlicer("testCloseAllSlicedMessageState", 1);
125 slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(),
126 mockOnFailureCallback);
130 verify(mockFiledBackedStream).cleanup();
131 verifyNoMoreInteractions(mockOnFailureCallback);
135 public void testCheckExpiredSlicedMessageState() throws IOException {
136 doReturn(1).when(mockInputStream).read(any(byte[].class));
138 final int expiryDuration = 200;
139 try (MessageSlicer slicer = MessageSlicer.builder().messageSliceSize(1)
140 .logContext("testCheckExpiredSlicedMessageState")
141 .fileBackedStreamFactory(mockFiledBackedStreamFactory)
142 .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
143 slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(),
144 mockOnFailureCallback);
146 Uninterruptibles.sleepUninterruptibly(expiryDuration + 50, TimeUnit.MILLISECONDS);
147 slicer.checkExpiredSlicedMessageState();
149 assertFailureCallback(RuntimeException.class);
150 verify(mockFiledBackedStream).cleanup();
154 private void assertFailureCallback(final Class<?> exceptionType) {
155 ArgumentCaptor<Throwable> exceptionCaptor = ArgumentCaptor.forClass(Throwable.class);
156 verify(mockOnFailureCallback).accept(exceptionCaptor.capture());
157 assertEquals("Exception type", exceptionType, exceptionCaptor.getValue().getClass());
160 private MessageSlicer newMessageSlicer(String logContext, final int messageSliceSize) {
161 return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext)
162 .fileBackedStreamFactory(mockFiledBackedStreamFactory).build();
165 static void slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo,
166 ActorRef replyTo, Consumer<Throwable> onFailureCallback) {
167 slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo).replyTo(replyTo)
168 .onFailureCallback(onFailureCallback).build());