Slice front-end request messages
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / test / java / org / opendaylight / controller / cluster / messaging / MessageSlicerTest.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Matchers.anyInt;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.doThrow;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
21
22 import akka.actor.ActorRef;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import java.io.IOException;
25 import java.io.Serializable;
26 import java.util.concurrent.TimeUnit;
27 import java.util.function.Consumer;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.mockito.ArgumentCaptor;
31 import org.mockito.Mock;
32 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
33 import org.opendaylight.yangtools.concepts.Identifier;
34
35 /**
36  * Unit tests for MessageSlicer.
37  *
38  * @author Thomas Pantelis
39  */
40 public class MessageSlicerTest extends AbstractMessagingTest {
41     @Mock
42     private Consumer<Throwable> mockOnFailureCallback;
43
44     @Override
45     @Before
46     public void setup() throws IOException {
47         super.setup();
48
49         doNothing().when(mockOnFailureCallback).accept(any(Throwable.class));
50     }
51
52     @Test
53     public void testHandledMessages() {
54         try (MessageSlicer slicer = newMessageSlicer("testHandledMessages", 100)) {
55             MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
56             final MessageSliceReply reply = MessageSliceReply.success(messageSliceId, 1, testProbe.ref());
57             assertEquals("isHandledMessage", Boolean.TRUE, MessageSlicer.isHandledMessage(reply));
58             assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
59
60             assertEquals("handledMessage", Boolean.TRUE, slicer.handleMessage(reply));
61             assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(new Object()));
62             assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
63                     IDENTIFIER, 1,testProbe.ref())));
64             assertEquals("handledMessage", Boolean.FALSE, slicer.handleMessage(MessageSliceReply.success(
65                     new MessageSliceIdentifier(IDENTIFIER, slicer.getId() + 1), 1,testProbe.ref())));
66         }
67     }
68
69     @Test
70     public void testSliceWithFailedSerialization() throws IOException {
71         IOException mockFailure = new IOException("mock IOException");
72         doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt());
73         doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class));
74         doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt());
75         doThrow(mockFailure).when(mockFiledBackedStream).flush();
76
77         try (MessageSlicer slicer = newMessageSlicer("testSliceWithFailedSerialization", 100)) {
78             final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
79                     testProbe.ref(), mockOnFailureCallback);
80             assertFalse(wasSliced);
81
82             assertFailureCallback(IOException.class);
83             verify(mockFiledBackedStream).cleanup();
84         }
85     }
86
87     @Test
88     public void testSliceWithByteSourceFailure() throws IOException {
89         IOException mockFailure = new IOException("mock IOException");
90         doThrow(mockFailure).when(mockByteSource).openStream();
91         doThrow(mockFailure).when(mockByteSource).openBufferedStream();
92
93         try (MessageSlicer slicer = newMessageSlicer("testSliceWithByteSourceFailure", 100)) {
94             final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
95                     testProbe.ref(), mockOnFailureCallback);
96             assertFalse(wasSliced);
97
98             assertFailureCallback(IOException.class);
99             verify(mockFiledBackedStream).cleanup();
100         }
101     }
102
103     @Test
104     public void testSliceWithInputStreamFailure() throws IOException {
105         doReturn(0).when(mockInputStream).read(any(byte[].class));
106
107         try (MessageSlicer slicer = newMessageSlicer("testSliceWithInputStreamFailure", 2)) {
108             final boolean wasSliced = slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{}), testProbe.ref(),
109                     testProbe.ref(), mockOnFailureCallback);
110             assertFalse(wasSliced);
111
112             assertFailureCallback(IOException.class);
113             verify(mockFiledBackedStream).cleanup();
114         }
115     }
116
117     @Test
118     public void testMessageSliceReplyWithNoState() {
119         try (MessageSlicer slicer = newMessageSlicer("testMessageSliceReplyWithNoState", 1000)) {
120             MessageSliceIdentifier messageSliceId = new MessageSliceIdentifier(IDENTIFIER, slicer.getId());
121             slicer.handleMessage(MessageSliceReply.success(messageSliceId, 1, testProbe.ref()));
122             final AbortSlicing abortSlicing = testProbe.expectMsgClass(AbortSlicing.class);
123             assertEquals("Identifier", messageSliceId, abortSlicing.getIdentifier());
124         }
125     }
126
127     @Test
128     public void testCloseAllSlicedMessageState() throws IOException {
129         doReturn(1).when(mockInputStream).read(any(byte[].class));
130
131         final MessageSlicer slicer = newMessageSlicer("testCloseAllSlicedMessageState", 1);
132         slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(),
133                 mockOnFailureCallback);
134
135         slicer.close();
136
137         verify(mockFiledBackedStream).cleanup();
138         verifyNoMoreInteractions(mockOnFailureCallback);
139     }
140
141     @Test
142     public void testCancelSlicing() throws IOException {
143         doReturn(1).when(mockInputStream).read(any(byte[].class));
144
145         final MessageSlicer slicer = newMessageSlicer("testCloseAllSlicedMessageState", 1);
146         slicer.slice(SliceOptions.builder().identifier(IDENTIFIER).fileBackedOutputStream(mockFiledBackedStream)
147                 .sendTo(testProbe.ref()).replyTo(testProbe.ref()).onFailureCallback(mockOnFailureCallback).build());
148
149         final FileBackedOutputStream mockFiledBackedStream2 = mock(FileBackedOutputStream.class);
150         setupMockFiledBackedStream(mockFiledBackedStream2);
151         slicer.slice(SliceOptions.builder().identifier(new StringIdentifier("test2"))
152                 .fileBackedOutputStream(mockFiledBackedStream2).sendTo(testProbe.ref()).replyTo(testProbe.ref())
153                 .onFailureCallback(mockOnFailureCallback).build());
154
155         slicer.cancelSlicing(id -> id.equals(IDENTIFIER));
156
157         verify(mockFiledBackedStream).cleanup();
158         verify(mockFiledBackedStream2, never()).cleanup();
159         verifyNoMoreInteractions(mockOnFailureCallback);
160     }
161
162     @Test
163     public void testCheckExpiredSlicedMessageState() throws IOException {
164         doReturn(1).when(mockInputStream).read(any(byte[].class));
165
166         final int expiryDuration = 200;
167         try (MessageSlicer slicer = MessageSlicer.builder().messageSliceSize(1)
168                 .logContext("testCheckExpiredSlicedMessageState")
169                 .fileBackedStreamFactory(mockFiledBackedStreamFactory)
170                 .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
171             slice(slicer, IDENTIFIER, new BytesMessage(new byte[]{1, 2}), testProbe.ref(), testProbe.ref(),
172                     mockOnFailureCallback);
173
174             Uninterruptibles.sleepUninterruptibly(expiryDuration + 50, TimeUnit.MILLISECONDS);
175             slicer.checkExpiredSlicedMessageState();
176
177             assertFailureCallback(RuntimeException.class);
178             verify(mockFiledBackedStream).cleanup();
179         }
180     }
181
182     private void assertFailureCallback(final Class<?> exceptionType) {
183         ArgumentCaptor<Throwable> exceptionCaptor = ArgumentCaptor.forClass(Throwable.class);
184         verify(mockOnFailureCallback).accept(exceptionCaptor.capture());
185         assertEquals("Exception type", exceptionType, exceptionCaptor.getValue().getClass());
186     }
187
188     private MessageSlicer newMessageSlicer(String logContext, final int messageSliceSize) {
189         return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext)
190                 .fileBackedStreamFactory(mockFiledBackedStreamFactory).build();
191     }
192
193     static boolean slice(MessageSlicer slicer, Identifier identifier, Serializable message, ActorRef sendTo,
194             ActorRef replyTo, Consumer<Throwable> onFailureCallback) {
195         return slicer.slice(SliceOptions.builder().identifier(identifier).message(message).sendTo(sendTo)
196                 .replyTo(replyTo).onFailureCallback(onFailureCallback).build());
197     }
198 }