Avoid unnecessary unsuccessful AppendEntriesReply
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / test / java / org / opendaylight / controller / cluster / messaging / MessageSlicingIntegrationTest.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.mock;
16 import static org.mockito.Mockito.reset;
17 import static org.mockito.Mockito.verify;
18 import static org.opendaylight.controller.cluster.messaging.MessageSlicerTest.slice;
19
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.testkit.TestProbe;
23 import akka.testkit.javadsl.TestKit;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.ObjectOutputStream;
27 import java.util.Arrays;
28 import java.util.function.BiConsumer;
29 import java.util.function.Consumer;
30 import org.apache.commons.lang3.SerializationUtils;
31 import org.junit.After;
32 import org.junit.AfterClass;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.mockito.ArgumentCaptor;
36 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
37 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
38 import org.opendaylight.yangtools.concepts.Identifier;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * End-to-end integration tests for message slicing.
44  *
45  * @author Thomas Pantelis
46  */
47 public class MessageSlicingIntegrationTest {
48     private static final Logger LOG = LoggerFactory.getLogger(MessageSlicingIntegrationTest.class);
49
50     private static final ActorSystem ACTOR_SYSTEM = ActorSystem.create("test");
51     private static final FileBackedOutputStreamFactory FILE_BACKED_STREAM_FACTORY =
52             new FileBackedOutputStreamFactory(1000000000, "target");
53     private static final Identifier IDENTIFIER = new StringIdentifier("stringId");
54     private static final int DONT_CARE = -1;
55
56     private final TestProbe sendToProbe = TestProbe.apply(ACTOR_SYSTEM);
57     private final TestProbe replyToProbe = TestProbe.apply(ACTOR_SYSTEM);
58
59     @SuppressWarnings("unchecked")
60     private final Consumer<Throwable> mockOnFailureCallback = mock(Consumer.class);
61
62     @SuppressWarnings("unchecked")
63     private final BiConsumer<Object, ActorRef> mockAssembledMessageCallback = mock(BiConsumer.class);
64
65     private final MessageAssembler assembler = MessageAssembler.builder()
66             .assembledMessageCallback(mockAssembledMessageCallback).logContext("test")
67             .fileBackedStreamFactory(FILE_BACKED_STREAM_FACTORY).build();
68
69     @Before
70     public void setup() {
71         doNothing().when(mockOnFailureCallback).accept(any(Throwable.class));
72         doNothing().when(mockAssembledMessageCallback).accept(any(Object.class), any(ActorRef.class));
73     }
74
75     @After
76     public void tearDown() {
77         assembler.close();
78     }
79
80     @AfterClass
81     public static void staticTearDown() {
82         TestKit.shutdownActorSystem(ACTOR_SYSTEM, true);
83     }
84
85     @Test
86     public void testSlicingWithChunks() throws IOException {
87         LOG.info("testSlicingWithChunks starting");
88
89         // First slice a message where the messageSliceSize divides evenly into the serialized size.
90
91         byte[] emptyMessageBytes = SerializationUtils.serialize(new BytesMessage(new byte[]{}));
92         int messageSliceSize = 10;
93         int expTotalSlices = emptyMessageBytes.length / messageSliceSize;
94         ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
95         if (emptyMessageBytes.length % messageSliceSize > 0) {
96             expTotalSlices++;
97             int padding = messageSliceSize - emptyMessageBytes.length % messageSliceSize;
98             byte value = 1;
99             for (int i = 0; i < padding; i++, value++) {
100                 byteStream.write(value);
101             }
102         }
103
104         testSlicing("testSlicingWithChunks", messageSliceSize, expTotalSlices, byteStream.toByteArray());
105
106         // Now slice a message where the messageSliceSize doesn't divide evenly.
107
108         byteStream.write(new byte[]{100, 101, 102});
109         testSlicing("testSlicingWithChunks", messageSliceSize, expTotalSlices + 1, byteStream.toByteArray());
110
111         LOG.info("testSlicingWithChunks ending");
112     }
113
114     @Test
115     public void testSingleSlice() {
116         LOG.info("testSingleSlice starting");
117
118         // Slice a message where the serialized size is equal to the messageSliceSize. In this case it should
119         // just send the original message.
120
121         final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
122         try (MessageSlicer slicer = newMessageSlicer("testSingleSlice", SerializationUtils.serialize(message).length)) {
123             final boolean wasSliced = slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(),
124                     mockOnFailureCallback);
125             assertFalse(wasSliced);
126
127             final BytesMessage sentMessage = sendToProbe.expectMsgClass(BytesMessage.class);
128             assertEquals("Sent message", message, sentMessage);
129         }
130
131         LOG.info("testSingleSlice ending");
132     }
133
134     @Test
135     public void testSlicingWithRetry() {
136         LOG.info("testSlicingWithRetry starting");
137
138         final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
139         final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
140         try (MessageSlicer slicer = newMessageSlicer("testSlicingWithRetry", messageSliceSize)) {
141             slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
142
143             MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
144             assembler.handleMessage(sliceMessage, sendToProbe.ref());
145
146             // Swallow the reply and send the MessageSlice again - it should return a failed reply.
147             replyToProbe.expectMsgClass(MessageSliceReply.class);
148             assembler.handleMessage(sliceMessage, sendToProbe.ref());
149
150             final MessageSliceReply failedReply = replyToProbe.expectMsgClass(MessageSliceReply.class);
151             assertFailedMessageSliceReply(failedReply, IDENTIFIER, true);
152
153             // Send the failed reply - slicing should be retried from the beginning.
154
155             slicer.handleMessage(failedReply);
156             while (true) {
157                 sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
158                 assembler.handleMessage(sliceMessage, sendToProbe.ref());
159
160                 final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
161                 assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceMessage.getSliceIndex());
162                 slicer.handleMessage(reply);
163
164                 if (reply.getSliceIndex() == sliceMessage.getTotalSlices()) {
165                     break;
166                 }
167             }
168
169             assertAssembledMessage(message, replyToProbe.ref());
170         }
171
172         LOG.info("testSlicingWithRetry ending");
173     }
174
175     @Test
176     public void testSlicingWithMaxRetriesReached() {
177         LOG.info("testSlicingWithMaxRetriesReached starting");
178
179         final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
180         final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
181         try (MessageSlicer slicer = newMessageSlicer("testSlicingWithMaxRetriesReached", messageSliceSize)) {
182             slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
183
184             Identifier slicingId = null;
185             for (int i = 0; i < MessageSlicer.DEFAULT_MAX_SLICING_TRIES; i++) {
186                 MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
187                 slicingId = sliceMessage.getIdentifier();
188                 assertMessageSlice(sliceMessage, IDENTIFIER, 1, DONT_CARE, SlicedMessageState.INITIAL_SLICE_HASH_CODE,
189                         replyToProbe.ref());
190                 assembler.handleMessage(sliceMessage, sendToProbe.ref());
191
192                 // Swallow the reply and send the MessageSlicer a reply with an invalid index.
193                 final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
194                 assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceMessage.getSliceIndex());
195                 slicer.handleMessage(MessageSliceReply.success(reply.getIdentifier(), 100000, reply.getSendTo()));
196
197                 final AbortSlicing abortSlicing = sendToProbe.expectMsgClass(AbortSlicing.class);
198                 assertEquals("Identifier", slicingId, abortSlicing.getIdentifier());
199                 assembler.handleMessage(abortSlicing, sendToProbe.ref());
200             }
201
202             slicer.handleMessage(MessageSliceReply.success(slicingId, 100000, sendToProbe.ref()));
203
204             assertFailureCallback(RuntimeException.class);
205
206             assertFalse("MessageSlicer did not remove state for " + slicingId, slicer.hasState(slicingId));
207             assertFalse("MessageAssembler did not remove state for " + slicingId, assembler.hasState(slicingId));
208         }
209
210         LOG.info("testSlicingWithMaxRetriesReached ending");
211     }
212
213     @Test
214     public void testSlicingWithFailure() {
215         LOG.info("testSlicingWithFailure starting");
216
217         final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
218         final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
219         try (MessageSlicer slicer = newMessageSlicer("testSlicingWithFailure", messageSliceSize)) {
220             final boolean wasSliced = slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(),
221                     mockOnFailureCallback);
222             assertTrue(wasSliced);
223
224             MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
225
226             MessageSliceException failure = new MessageSliceException("mock failure",
227                     new IOException("mock IOException"));
228             slicer.handleMessage(MessageSliceReply.failed(sliceMessage.getIdentifier(), failure, sendToProbe.ref()));
229
230             assertFailureCallback(IOException.class);
231
232             assertFalse("MessageSlicer did not remove state for " + sliceMessage.getIdentifier(),
233                     slicer.hasState(sliceMessage.getIdentifier()));
234         }
235
236         LOG.info("testSlicingWithFailure ending");
237     }
238
239     @Test
240     public void testSliceWithFileBackedOutputStream() throws IOException {
241         LOG.info("testSliceWithFileBackedOutputStream starting");
242
243         final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
244         FileBackedOutputStream fileBackedStream = FILE_BACKED_STREAM_FACTORY.newInstance();
245         try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
246             out.writeObject(message);
247         }
248
249         try (MessageSlicer slicer = newMessageSlicer("testSliceWithFileBackedOutputStream",
250                 SerializationUtils.serialize(message).length)) {
251             slicer.slice(SliceOptions.builder().identifier(IDENTIFIER).fileBackedOutputStream(fileBackedStream)
252                     .sendTo(ACTOR_SYSTEM.actorSelection(sendToProbe.ref().path())).replyTo(replyToProbe.ref())
253                     .onFailureCallback(mockOnFailureCallback).build());
254
255             final MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
256             assembler.handleMessage(sliceMessage, sendToProbe.ref());
257             assertAssembledMessage(message, replyToProbe.ref());
258         }
259
260         LOG.info("testSliceWithFileBackedOutputStream ending");
261     }
262
263     @SuppressWarnings("unchecked")
264     private void testSlicing(final String logContext, final int messageSliceSize, final int expTotalSlices,
265             final byte[] messageData) {
266         reset(mockAssembledMessageCallback);
267
268         final BytesMessage message = new BytesMessage(messageData);
269
270         try (MessageSlicer slicer = newMessageSlicer(logContext, messageSliceSize)) {
271             final boolean wasSliced = slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(),
272                     mockOnFailureCallback);
273             assertTrue(wasSliced);
274
275             Identifier slicingId = null;
276             int expLastSliceHashCode = SlicedMessageState.INITIAL_SLICE_HASH_CODE;
277             for (int sliceIndex = 1; sliceIndex <= expTotalSlices; sliceIndex++) {
278                 final MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
279                 slicingId = sliceMessage.getIdentifier();
280                 assertMessageSlice(sliceMessage, IDENTIFIER, sliceIndex, expTotalSlices, expLastSliceHashCode,
281                         replyToProbe.ref());
282
283                 assembler.handleMessage(sliceMessage, sendToProbe.ref());
284
285                 final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
286                 assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceIndex);
287
288                 expLastSliceHashCode = Arrays.hashCode(sliceMessage.getData());
289
290                 slicer.handleMessage(reply);
291             }
292
293             assertAssembledMessage(message, replyToProbe.ref());
294
295             assertFalse("MessageSlicer did not remove state for " + slicingId, slicer.hasState(slicingId));
296             assertFalse("MessageAssembler did not remove state for " + slicingId, assembler.hasState(slicingId));
297         }
298     }
299
300     private void assertFailureCallback(final Class<?> exceptionType) {
301         ArgumentCaptor<Throwable> exceptionCaptor = ArgumentCaptor.forClass(Throwable.class);
302         verify(mockOnFailureCallback).accept(exceptionCaptor.capture());
303         assertEquals("Exception type", exceptionType, exceptionCaptor.getValue().getClass());
304     }
305
306     private void assertAssembledMessage(final BytesMessage message, final ActorRef sender) {
307         assertAssembledMessage(mockAssembledMessageCallback, message, sender);
308     }
309
310     static void assertAssembledMessage(final BiConsumer<Object, ActorRef> mockAssembledMessageCallback,
311             final BytesMessage message, final ActorRef sender) {
312         ArgumentCaptor<Object> assembledMessageCaptor = ArgumentCaptor.forClass(Object.class);
313         ArgumentCaptor<ActorRef> senderActorRefCaptor = ArgumentCaptor.forClass(ActorRef.class);
314         verify(mockAssembledMessageCallback).accept(assembledMessageCaptor.capture(), senderActorRefCaptor.capture());
315         assertEquals("Assembled message", message, assembledMessageCaptor.getValue());
316         assertEquals("Sender ActorRef", sender, senderActorRefCaptor.getValue());
317     }
318
319     static void assertSuccessfulMessageSliceReply(final MessageSliceReply reply, final Identifier identifier,
320             final int sliceIndex) {
321         assertEquals("Identifier", identifier, ((MessageSliceIdentifier)reply.getIdentifier())
322                 .getClientIdentifier());
323         assertEquals("SliceIndex", sliceIndex, reply.getSliceIndex());
324     }
325
326     static void assertFailedMessageSliceReply(final MessageSliceReply reply, final Identifier identifier,
327             final boolean isRetriable) {
328         assertEquals("Identifier", identifier, ((MessageSliceIdentifier)reply.getIdentifier())
329                 .getClientIdentifier());
330         assertEquals("Failure present", Boolean.TRUE, reply.getFailure().isPresent());
331         assertEquals("isRetriable", isRetriable, reply.getFailure().get().isRetriable());
332     }
333
334     static void assertMessageSlice(final MessageSlice sliceMessage, final Identifier identifier, final int sliceIndex,
335             final int totalSlices, final int lastSliceHashCode, final ActorRef replyTo) {
336         assertEquals("Identifier", identifier, ((MessageSliceIdentifier)sliceMessage.getIdentifier())
337                 .getClientIdentifier());
338         assertEquals("SliceIndex", sliceIndex, sliceMessage.getSliceIndex());
339         assertEquals("LastSliceHashCode", lastSliceHashCode, sliceMessage.getLastSliceHashCode());
340         assertEquals("ReplyTo", replyTo, sliceMessage.getReplyTo());
341
342         if (totalSlices != DONT_CARE) {
343             assertEquals("TotalSlices", totalSlices, sliceMessage.getTotalSlices());
344         }
345     }
346
347     private static MessageSlicer newMessageSlicer(final String logContext, final int messageSliceSize) {
348         return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext)
349                 .fileBackedStreamFactory(FILE_BACKED_STREAM_FACTORY).build();
350     }
351 }