a490a247899f05c27d8e184123d7a6a2d916c0b8
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / test / java / org / opendaylight / controller / cluster / messaging / MessageSlicingIntegrationTest.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.doNothing;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.verify;
17 import static org.opendaylight.controller.cluster.messaging.MessageSlicerTest.slice;
18
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSystem;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestProbe;
23 import java.io.ByteArrayOutputStream;
24 import java.io.IOException;
25 import java.io.ObjectOutputStream;
26 import java.util.Arrays;
27 import java.util.function.BiConsumer;
28 import java.util.function.Consumer;
29 import org.apache.commons.lang3.SerializationUtils;
30 import org.junit.After;
31 import org.junit.AfterClass;
32 import org.junit.Before;
33 import org.junit.Test;
34 import org.mockito.ArgumentCaptor;
35 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
36 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
37 import org.opendaylight.yangtools.concepts.Identifier;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * End-to-end integration tests for message slicing.
43  *
44  * @author Thomas Pantelis
45  */
46 public class MessageSlicingIntegrationTest {
47     private static final Logger LOG = LoggerFactory.getLogger(MessageSlicingIntegrationTest.class);
48
49     private static final ActorSystem ACTOR_SYSTEM = ActorSystem.create("test");
50     private static final FileBackedOutputStreamFactory FILE_BACKED_STREAM_FACTORY =
51             new FileBackedOutputStreamFactory(1000000000, "target");
52     private static final Identifier IDENTIFIER = new StringIdentifier("stringId");
53     private static final int DONT_CARE = -1;
54
55     private final TestProbe sendToProbe = TestProbe.apply(ACTOR_SYSTEM);
56     private final TestProbe replyToProbe = TestProbe.apply(ACTOR_SYSTEM);
57
58     @SuppressWarnings("unchecked")
59     private final Consumer<Throwable> mockOnFailureCallback = mock(Consumer.class);
60
61     @SuppressWarnings("unchecked")
62     private final BiConsumer<Object, ActorRef> mockAssembledMessageCallback = mock(BiConsumer.class);
63
64     private final MessageAssembler assembler = MessageAssembler.builder()
65             .assembledMessageCallback(mockAssembledMessageCallback).logContext("test")
66             .filedBackedStreamFactory(FILE_BACKED_STREAM_FACTORY).build();
67
68     @Before
69     public void setup() {
70         doNothing().when(mockOnFailureCallback).accept(any(Throwable.class));
71         doNothing().when(mockAssembledMessageCallback).accept(any(Object.class), any(ActorRef.class));
72     }
73
74     @After
75     public void tearDown() {
76         assembler.close();
77     }
78
79     @AfterClass
80     public static void staticTearDown() {
81         JavaTestKit.shutdownActorSystem(ACTOR_SYSTEM, Boolean.TRUE);
82     }
83
84     @Test
85     public void testSlicingWithChunks() throws IOException {
86         LOG.info("testSlicingWithChunks starting");
87
88         // First slice a message where the messageSliceSize divides evenly into the serialized size.
89
90         byte[] emptyMessageBytes = SerializationUtils.serialize(new BytesMessage(new byte[]{}));
91         int messageSliceSize = 10;
92         int expTotalSlices = emptyMessageBytes.length / messageSliceSize;
93         ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
94         if (emptyMessageBytes.length % messageSliceSize > 0) {
95             expTotalSlices++;
96             int padding = messageSliceSize - emptyMessageBytes.length % messageSliceSize;
97             byte value = 1;
98             for (int i = 0; i < padding; i++, value++) {
99                 byteStream.write(value);
100             }
101         }
102
103         testSlicing("testSlicingWithChunks", messageSliceSize, expTotalSlices, byteStream.toByteArray());
104
105         // Now slice a message where the messageSliceSize doesn't divide evenly.
106
107         byteStream.write(new byte[]{100, 101, 102});
108         testSlicing("testSlicingWithChunks", messageSliceSize, expTotalSlices + 1, byteStream.toByteArray());
109
110         LOG.info("testSlicingWithChunks ending");
111     }
112
113     @Test
114     public void testSingleSlice() {
115         LOG.info("testSingleSlice starting");
116
117         // Slice a message where the serialized size is equal to the messageSliceSize. In this case it should
118         // just send the original message.
119
120         final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
121         try (MessageSlicer slicer = newMessageSlicer("testSingleSlice", SerializationUtils.serialize(message).length)) {
122             slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
123
124             final BytesMessage sentMessage = sendToProbe.expectMsgClass(BytesMessage.class);
125             assertEquals("Sent message", message, sentMessage);
126         }
127
128         LOG.info("testSingleSlice ending");
129     }
130
131     @Test
132     public void testSlicingWithRetry() {
133         LOG.info("testSlicingWithRetry starting");
134
135         final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
136         final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
137         try (MessageSlicer slicer = newMessageSlicer("testSlicingWithRetry", messageSliceSize)) {
138             slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
139
140             MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
141             assembler.handleMessage(sliceMessage, sendToProbe.ref());
142
143             // Swallow the reply and send the MessageSlice again - it should return a failed reply.
144             replyToProbe.expectMsgClass(MessageSliceReply.class);
145             assembler.handleMessage(sliceMessage, sendToProbe.ref());
146
147             final MessageSliceReply failedReply = replyToProbe.expectMsgClass(MessageSliceReply.class);
148             assertFailedMessageSliceReply(failedReply, IDENTIFIER, true);
149
150             // Send the failed reply - slicing should be retried from the beginning.
151
152             slicer.handleMessage(failedReply);
153             while (true) {
154                 sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
155                 assembler.handleMessage(sliceMessage, sendToProbe.ref());
156
157                 final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
158                 assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceMessage.getSliceIndex());
159                 slicer.handleMessage(reply);
160
161                 if (reply.getSliceIndex() == sliceMessage.getTotalSlices()) {
162                     break;
163                 }
164             }
165
166             assertAssembledMessage(message, replyToProbe.ref());
167         }
168
169         LOG.info("testSlicingWithRetry ending");
170     }
171
172     @Test
173     public void testSlicingWithMaxRetriesReached() {
174         LOG.info("testSlicingWithMaxRetriesReached starting");
175
176         final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
177         final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
178         try (MessageSlicer slicer = newMessageSlicer("testSlicingWithMaxRetriesReached", messageSliceSize)) {
179             slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
180
181             Identifier slicingId = null;
182             for (int i = 0; i < MessageSlicer.DEFAULT_MAX_SLICING_TRIES; i++) {
183                 MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
184                 slicingId = sliceMessage.getIdentifier();
185                 assertMessageSlice(sliceMessage, IDENTIFIER, 1, DONT_CARE, SlicedMessageState.INITIAL_SLICE_HASH_CODE,
186                         replyToProbe.ref());
187                 assembler.handleMessage(sliceMessage, sendToProbe.ref());
188
189                 // Swallow the reply and send the MessageSlicer a reply with an invalid index.
190                 final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
191                 assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceMessage.getSliceIndex());
192                 slicer.handleMessage(MessageSliceReply.success(reply.getIdentifier(), 100000, reply.getSendTo()));
193
194                 final AbortSlicing abortSlicing = sendToProbe.expectMsgClass(AbortSlicing.class);
195                 assertEquals("Identifier", slicingId, abortSlicing.getIdentifier());
196                 assembler.handleMessage(abortSlicing, sendToProbe.ref());
197             }
198
199             slicer.handleMessage(MessageSliceReply.success(slicingId, 100000, sendToProbe.ref()));
200
201             assertFailureCallback(RuntimeException.class);
202
203             assertFalse("MessageSlicer did not remove state for " + slicingId, slicer.hasState(slicingId));
204             assertFalse("MessageAssembler did not remove state for " + slicingId, assembler.hasState(slicingId));
205         }
206
207         LOG.info("testSlicingWithMaxRetriesReached ending");
208     }
209
210     @Test
211     public void testSlicingWithFailure() {
212         LOG.info("testSlicingWithFailure starting");
213
214         final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
215         final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
216         try (MessageSlicer slicer = newMessageSlicer("testSlicingWithFailure", messageSliceSize)) {
217             slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
218
219             MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
220
221             MessageSliceException failure = new MessageSliceException("mock failure",
222                     new IOException("mock IOException"));
223             slicer.handleMessage(MessageSliceReply.failed(sliceMessage.getIdentifier(), failure, sendToProbe.ref()));
224
225             assertFailureCallback(IOException.class);
226
227             assertFalse("MessageSlicer did not remove state for " + sliceMessage.getIdentifier(),
228                     slicer.hasState(sliceMessage.getIdentifier()));
229         }
230
231         LOG.info("testSlicingWithFailure ending");
232     }
233
234     @Test
235     public void testSliceWithFileBackedOutputStream() throws IOException {
236         LOG.info("testSliceWithFileBackedOutputStream starting");
237
238         final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
239         FileBackedOutputStream fileBackedStream = FILE_BACKED_STREAM_FACTORY.newInstance();
240         try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
241             out.writeObject(message);
242         }
243
244         try (MessageSlicer slicer = newMessageSlicer("testSliceWithFileBackedOutputStream",
245                 SerializationUtils.serialize(message).length)) {
246             slicer.slice(SliceOptions.builder().identifier(IDENTIFIER).fileBackedOutputStream(fileBackedStream)
247                     .sendTo(ACTOR_SYSTEM.actorSelection(sendToProbe.ref().path())).replyTo(replyToProbe.ref())
248                     .onFailureCallback(mockOnFailureCallback).build());
249
250             final MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
251             assembler.handleMessage(sliceMessage, sendToProbe.ref());
252             assertAssembledMessage(message, replyToProbe.ref());
253         }
254
255         LOG.info("testSliceWithFileBackedOutputStream ending");
256     }
257
258     @SuppressWarnings("unchecked")
259     private void testSlicing(String logContext, int messageSliceSize, int expTotalSlices, byte[] messageData) {
260         reset(mockAssembledMessageCallback);
261
262         final BytesMessage message = new BytesMessage(messageData);
263
264         try (MessageSlicer slicer = newMessageSlicer(logContext, messageSliceSize)) {
265             slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
266
267             Identifier slicingId = null;
268             int expLastSliceHashCode = SlicedMessageState.INITIAL_SLICE_HASH_CODE;
269             for (int sliceIndex = 1; sliceIndex <= expTotalSlices; sliceIndex++) {
270                 final MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
271                 slicingId = sliceMessage.getIdentifier();
272                 assertMessageSlice(sliceMessage, IDENTIFIER, sliceIndex, expTotalSlices, expLastSliceHashCode,
273                         replyToProbe.ref());
274
275                 assembler.handleMessage(sliceMessage, sendToProbe.ref());
276
277                 final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
278                 assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceIndex);
279
280                 expLastSliceHashCode = Arrays.hashCode(sliceMessage.getData());
281
282                 slicer.handleMessage(reply);
283             }
284
285             assertAssembledMessage(message, replyToProbe.ref());
286
287             assertFalse("MessageSlicer did not remove state for " + slicingId, slicer.hasState(slicingId));
288             assertFalse("MessageAssembler did not remove state for " + slicingId, assembler.hasState(slicingId));
289         }
290     }
291
292     private void assertFailureCallback(final Class<?> exceptionType) {
293         ArgumentCaptor<Throwable> exceptionCaptor = ArgumentCaptor.forClass(Throwable.class);
294         verify(mockOnFailureCallback).accept(exceptionCaptor.capture());
295         assertEquals("Exception type", exceptionType, exceptionCaptor.getValue().getClass());
296     }
297
298     private void assertAssembledMessage(final BytesMessage message, final ActorRef sender) {
299         assertAssembledMessage(mockAssembledMessageCallback, message, sender);
300     }
301
302     static void assertAssembledMessage(final BiConsumer<Object, ActorRef> mockAssembledMessageCallback,
303             final BytesMessage message, final ActorRef sender) {
304         ArgumentCaptor<Object> assembledMessageCaptor = ArgumentCaptor.forClass(Object.class);
305         ArgumentCaptor<ActorRef> senderActorRefCaptor = ArgumentCaptor.forClass(ActorRef.class);
306         verify(mockAssembledMessageCallback).accept(assembledMessageCaptor.capture(), senderActorRefCaptor.capture());
307         assertEquals("Assembled message", message, assembledMessageCaptor.getValue());
308         assertEquals("Sender ActorRef", sender, senderActorRefCaptor.getValue());
309     }
310
311     static void assertSuccessfulMessageSliceReply(MessageSliceReply reply, Identifier identifier, int sliceIndex) {
312         assertEquals("Identifier", identifier, ((MessageSliceIdentifier)reply.getIdentifier())
313                 .getClientIdentifier());
314         assertEquals("SliceIndex", sliceIndex, reply.getSliceIndex());
315     }
316
317     static void assertFailedMessageSliceReply(MessageSliceReply reply, Identifier identifier, boolean isRetriable) {
318         assertEquals("Identifier", identifier, ((MessageSliceIdentifier)reply.getIdentifier())
319                 .getClientIdentifier());
320         assertEquals("Failure present", Boolean.TRUE, reply.getFailure().isPresent());
321         assertEquals("isRetriable", isRetriable, reply.getFailure().get().isRetriable());
322     }
323
324     static void assertMessageSlice(MessageSlice sliceMessage, Identifier identifier, int sliceIndex, int totalSlices,
325             int lastSliceHashCode, ActorRef replyTo) {
326         assertEquals("Identifier", identifier, ((MessageSliceIdentifier)sliceMessage.getIdentifier())
327                 .getClientIdentifier());
328         assertEquals("SliceIndex", sliceIndex, sliceMessage.getSliceIndex());
329         assertEquals("LastSliceHashCode", lastSliceHashCode, sliceMessage.getLastSliceHashCode());
330         assertEquals("ReplyTo", replyTo, sliceMessage.getReplyTo());
331
332         if (totalSlices != DONT_CARE) {
333             assertEquals("TotalSlices", totalSlices, sliceMessage.getTotalSlices());
334         }
335     }
336
337     private MessageSlicer newMessageSlicer(String logContext, final int messageSliceSize) {
338         return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext)
339                 .filedBackedStreamFactory(FILE_BACKED_STREAM_FACTORY).build();
340     }
341 }