ac6ab2e447b52e61b49e00c348f4af245886795e
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / test / java / org / opendaylight / controller / cluster / messaging / MessageSlicerTest.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Matchers.any;
12 import static org.mockito.Matchers.anyInt;
13 import static org.mockito.Mockito.doNothing;
14 import static org.mockito.Mockito.doReturn;
15 import static org.mockito.Mockito.doThrow;
16 import static org.mockito.Mockito.verify;
17 import static org.mockito.Mockito.verifyNoMoreInteractions;
18
19 import akka.actor.ActorRef;
20 import com.google.common.util.concurrent.Uninterruptibles;
21 import java.io.IOException;
22 import java.io.Serializable;
23 import java.util.concurrent.TimeUnit;
24 import java.util.function.Consumer;
25 import org.junit.Before;
26 import org.junit.Test;
27 import org.mockito.ArgumentCaptor;
28 import org.mockito.Mock;
29 import org.opendaylight.yangtools.concepts.Identifier;
30
31 /**
32  * Unit tests for MessageSlicer.
33  *
34  * @author Thomas Pantelis
35  */
36 public class MessageSlicerTest extends AbstractMessagingTest {
37     @Mock
38     private Consumer<Throwable> mockOnFailureCallback;
39
40     @Override
41     @Before
42     public void setup() throws IOException {
43         super.setup();
44
45         doNothing().when(mockOnFailureCallback).accept(any(Throwable.class));
46     }
47
48     @Test
49     public void testHandledMessages() {
50         try (MessageSlicer slicer = newMessageSlicer("testHandledMessages", 100)) {
51             MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
52             final MessageSliceReply reply = MessageSliceReply.success(messageSliceId, 1, testProbe.ref());
53             assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
54             assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
55
56             assertEquals("handledMessage", Boolean.TRUE, slicer.handleMessage(reply));
57             assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(new Object()));
58             assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
59                     IDENTIFIER, 1,testProbe.ref())));
60             assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
61                     new MessageSliceIdentifier(IDENTIFIER, slicer.getId() + 1), 1,testProbe.ref())));
62         }
63     }
64
65     @Test
66     public void testSliceWithFailedSerialization() throws IOException {
67         IOException mockFailure = new IOException("mock IOException");
68         doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt());
69         doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class));
70         doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt());
71         doThrow(mockFailure).when(mockFiledBackedStream).flush();
72
73         try (MessageSlicer slicer = newMessageSlicer("testSliceWithFailedSerialization", 100)) {
74             slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
75                     mockOnFailureCallback);
76
77             assertFailureCallback(IOException.class);
78             verify(mockFiledBackedStream).cleanup();
79         }
80     }
81
82     @Test
83     public void testSliceWithByteSourceFailure() throws IOException {
84         IOException mockFailure = new IOException("mock IOException");
85         doThrow(mockFailure).when(mockByteSource).openStream();
86         doThrow(mockFailure).when(mockByteSource).openBufferedStream();
87
88         try (MessageSlicer slicer = newMessageSlicer("testSliceWithByteSourceFailure", 100)) {
89             slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
90                     mockOnFailureCallback);
91
92             assertFailureCallback(IOException.class);
93             verify(mockFiledBackedStream).cleanup();
94         }
95     }
96
97     @Test
98     public void testSliceWithInputStreamFailure() throws IOException {
99         doReturn(0).when(mockInputStream).read(any(byte[].class));
100
101         try (MessageSlicer slicer = newMessageSlicer("testSliceWithInputStreamFailure", 2)) {
102             slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(), testProbe.ref(),
103                     mockOnFailureCallback);
104
105             assertFailureCallback(IOException.class);
106             verify(mockFiledBackedStream).cleanup();
107         }
108     }
109
110     @Test
111     public void testMessageSliceReplyWithNoState() {
112         try (MessageSlicer slicer = newMessageSlicer("testMessageSliceReplyWithNoState", 1000)) {
113             MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
114             slicer.handleMessage(MessageSliceReply.success(messageSliceId, 1, testProbe.ref()));
115             final AbortSlicing abortSlicing = testProbe.expectMsgClass(AbortSlicing.class);
116             assertEquals("Identifier", messageSliceId, abortSlicing.getIdentifier());
117         }
118     }
119
120     @Test
121     public void testCloseAllSlicedMessageState() throws IOException {
122         doReturn(1).when(mockInputStream).read(any(byte[].class));
123
124         final MessageSlicer slicer = newMessageSlicer("testCloseAllSlicedMessageState", 1);
125         slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(),
126                 mockOnFailureCallback);
127
128         slicer.close();
129
130         verify(mockFiledBackedStream).cleanup();
131         verifyNoMoreInteractions(mockOnFailureCallback);
132     }
133
134     @Test
135     public void testCheckExpiredSlicedMessageState() throws IOException {
136         doReturn(1).when(mockInputStream).read(any(byte[].class));
137
138         final int expiryDuration = 200;
139         try (MessageSlicer slicer = MessageSlicer.builder().messageSliceSize(1)
140                 .logContext("testCheckExpiredSlicedMessageState")
141                 .fileBackedStreamFactory(mockFiledBackedStreamFactory)
142                 .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
143             slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(),
144                     mockOnFailureCallback);
145
146             Uninterruptibles.sleepUninterruptibly(expiryDuration + 50, TimeUnit.MILLISECONDS);
147             slicer.checkExpiredSlicedMessageState();
148
149             assertFailureCallback(RuntimeException.class);
150             verify(mockFiledBackedStream).cleanup();
151         }
152     }
153
154     private void assertFailureCallback(final Class<?> exceptionType) {
155         ArgumentCaptor<Throwable> exceptionCaptor = ArgumentCaptor.forClass(Throwable.class);
156         verify(mockOnFailureCallback).accept(exceptionCaptor.capture());
157         assertEquals("Exception type", exceptionType, exceptionCaptor.getValue().getClass());
158     }
159
160     private MessageSlicer newMessageSlicer(String logContext, final int messageSliceSize) {
161         return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext)
162                 .fileBackedStreamFactory(mockFiledBackedStreamFactory).build();
163     }
164
165     static void slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo,
166             ActorRef replyTo, Consumer<Throwable> onFailureCallback) {
167         slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo).replyTo(replyTo)
168                 .onFailureCallback(onFailureCallback).build());
169     }
170 }