2 * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.messaging;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.ArgumentMatchers.any;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.mock;
16 import static org.mockito.Mockito.reset;
17 import static org.mockito.Mockito.verify;
18 import static org.opendaylight.controller.cluster.messaging.MessageSlicerTest.slice;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSystem;
22 import akka.testkit.TestProbe;
23 import akka.testkit.javadsl.TestKit;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.ObjectOutputStream;
27 import java.util.Arrays;
28 import java.util.function.BiConsumer;
29 import java.util.function.Consumer;
30 import org.apache.commons.lang3.SerializationUtils;
31 import org.junit.After;
32 import org.junit.AfterClass;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.mockito.ArgumentCaptor;
36 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
37 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
38 import org.opendaylight.yangtools.concepts.Identifier;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * End-to-end integration tests for message slicing.
45 * @author Thomas Pantelis
47 public class MessageSlicingIntegrationTest {
48 private static final Logger LOG = LoggerFactory.getLogger(MessageSlicingIntegrationTest.class);
50 private static final ActorSystem ACTOR_SYSTEM = ActorSystem.create("test");
51 private static final FileBackedOutputStreamFactory FILE_BACKED_STREAM_FACTORY =
52 new FileBackedOutputStreamFactory(1000000000, "target");
53 private static final Identifier IDENTIFIER = new StringIdentifier("stringId");
54 private static final int DONT_CARE = -1;
56 private final TestProbe sendToProbe = TestProbe.apply(ACTOR_SYSTEM);
57 private final TestProbe replyToProbe = TestProbe.apply(ACTOR_SYSTEM);
59 @SuppressWarnings("unchecked")
60 private final Consumer<Throwable> mockOnFailureCallback = mock(Consumer.class);
62 @SuppressWarnings("unchecked")
63 private final BiConsumer<Object, ActorRef> mockAssembledMessageCallback = mock(BiConsumer.class);
65 private final MessageAssembler assembler = MessageAssembler.builder()
66 .assembledMessageCallback(mockAssembledMessageCallback).logContext("test")
67 .fileBackedStreamFactory(FILE_BACKED_STREAM_FACTORY).build();
71 doNothing().when(mockOnFailureCallback).accept(any(Throwable.class));
72 doNothing().when(mockAssembledMessageCallback).accept(any(Object.class), any(ActorRef.class));
76 public void tearDown() {
81 public static void staticTearDown() {
82 TestKit.shutdownActorSystem(ACTOR_SYSTEM, true);
86 public void testSlicingWithChunks() throws IOException {
87 LOG.info("testSlicingWithChunks starting");
89 // First slice a message where the messageSliceSize divides evenly into the serialized size.
91 byte[] emptyMessageBytes = SerializationUtils.serialize(new BytesMessage(new byte[]{}));
92 int messageSliceSize = 10;
93 int expTotalSlices = emptyMessageBytes.length / messageSliceSize;
94 ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
95 if (emptyMessageBytes.length % messageSliceSize > 0) {
97 int padding = messageSliceSize - emptyMessageBytes.length % messageSliceSize;
99 for (int i = 0; i < padding; i++, value++) {
100 byteStream.write(value);
104 testSlicing("testSlicingWithChunks", messageSliceSize, expTotalSlices, byteStream.toByteArray());
106 // Now slice a message where the messageSliceSize doesn't divide evenly.
108 byteStream.write(new byte[]{100, 101, 102});
109 testSlicing("testSlicingWithChunks", messageSliceSize, expTotalSlices + 1, byteStream.toByteArray());
111 LOG.info("testSlicingWithChunks ending");
115 public void testSingleSlice() {
116 LOG.info("testSingleSlice starting");
118 // Slice a message where the serialized size is equal to the messageSliceSize. In this case it should
119 // just send the original message.
121 final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
122 try (MessageSlicer slicer = newMessageSlicer("testSingleSlice", SerializationUtils.serialize(message).length)) {
123 final boolean wasSliced = slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(),
124 mockOnFailureCallback);
125 assertFalse(wasSliced);
127 final BytesMessage sentMessage = sendToProbe.expectMsgClass(BytesMessage.class);
128 assertEquals("Sent message", message, sentMessage);
131 LOG.info("testSingleSlice ending");
135 public void testSlicingWithRetry() {
136 LOG.info("testSlicingWithRetry starting");
138 final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
139 final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
140 try (MessageSlicer slicer = newMessageSlicer("testSlicingWithRetry", messageSliceSize)) {
141 slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
143 MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
144 assembler.handleMessage(sliceMessage, sendToProbe.ref());
146 // Swallow the reply and send the MessageSlice again - it should return a failed reply.
147 replyToProbe.expectMsgClass(MessageSliceReply.class);
148 assembler.handleMessage(sliceMessage, sendToProbe.ref());
150 final MessageSliceReply failedReply = replyToProbe.expectMsgClass(MessageSliceReply.class);
151 assertFailedMessageSliceReply(failedReply, IDENTIFIER, true);
153 // Send the failed reply - slicing should be retried from the beginning.
155 slicer.handleMessage(failedReply);
157 sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
158 assembler.handleMessage(sliceMessage, sendToProbe.ref());
160 final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
161 assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceMessage.getSliceIndex());
162 slicer.handleMessage(reply);
164 if (reply.getSliceIndex() == sliceMessage.getTotalSlices()) {
169 assertAssembledMessage(message, replyToProbe.ref());
172 LOG.info("testSlicingWithRetry ending");
176 public void testSlicingWithMaxRetriesReached() {
177 LOG.info("testSlicingWithMaxRetriesReached starting");
179 final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
180 final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
181 try (MessageSlicer slicer = newMessageSlicer("testSlicingWithMaxRetriesReached", messageSliceSize)) {
182 slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
184 Identifier slicingId = null;
185 for (int i = 0; i < MessageSlicer.DEFAULT_MAX_SLICING_TRIES; i++) {
186 MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
187 slicingId = sliceMessage.getIdentifier();
188 assertMessageSlice(sliceMessage, IDENTIFIER, 1, DONT_CARE, SlicedMessageState.INITIAL_SLICE_HASH_CODE,
190 assembler.handleMessage(sliceMessage, sendToProbe.ref());
192 // Swallow the reply and send the MessageSlicer a reply with an invalid index.
193 final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
194 assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceMessage.getSliceIndex());
195 slicer.handleMessage(MessageSliceReply.success(reply.getIdentifier(), 100000, reply.getSendTo()));
197 final AbortSlicing abortSlicing = sendToProbe.expectMsgClass(AbortSlicing.class);
198 assertEquals("Identifier", slicingId, abortSlicing.getIdentifier());
199 assembler.handleMessage(abortSlicing, sendToProbe.ref());
202 slicer.handleMessage(MessageSliceReply.success(slicingId, 100000, sendToProbe.ref()));
204 assertFailureCallback(RuntimeException.class);
206 assertFalse("MessageSlicer did not remove state for " + slicingId, slicer.hasState(slicingId));
207 assertFalse("MessageAssembler did not remove state for " + slicingId, assembler.hasState(slicingId));
210 LOG.info("testSlicingWithMaxRetriesReached ending");
214 public void testSlicingWithFailure() {
215 LOG.info("testSlicingWithFailure starting");
217 final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
218 final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
219 try (MessageSlicer slicer = newMessageSlicer("testSlicingWithFailure", messageSliceSize)) {
220 final boolean wasSliced = slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(),
221 mockOnFailureCallback);
222 assertTrue(wasSliced);
224 MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
226 MessageSliceException failure = new MessageSliceException("mock failure",
227 new IOException("mock IOException"));
228 slicer.handleMessage(MessageSliceReply.failed(sliceMessage.getIdentifier(), failure, sendToProbe.ref()));
230 assertFailureCallback(IOException.class);
232 assertFalse("MessageSlicer did not remove state for " + sliceMessage.getIdentifier(),
233 slicer.hasState(sliceMessage.getIdentifier()));
236 LOG.info("testSlicingWithFailure ending");
240 public void testSliceWithFileBackedOutputStream() throws IOException {
241 LOG.info("testSliceWithFileBackedOutputStream starting");
243 final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
244 FileBackedOutputStream fileBackedStream = FILE_BACKED_STREAM_FACTORY.newInstance();
245 try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
246 out.writeObject(message);
249 try (MessageSlicer slicer = newMessageSlicer("testSliceWithFileBackedOutputStream",
250 SerializationUtils.serialize(message).length)) {
251 slicer.slice(SliceOptions.builder().identifier(IDENTIFIER).fileBackedOutputStream(fileBackedStream)
252 .sendTo(ACTOR_SYSTEM.actorSelection(sendToProbe.ref().path())).replyTo(replyToProbe.ref())
253 .onFailureCallback(mockOnFailureCallback).build());
255 final MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
256 assembler.handleMessage(sliceMessage, sendToProbe.ref());
257 assertAssembledMessage(message, replyToProbe.ref());
260 LOG.info("testSliceWithFileBackedOutputStream ending");
263 @SuppressWarnings("unchecked")
264 private void testSlicing(final String logContext, final int messageSliceSize, final int expTotalSlices,
265 final byte[] messageData) {
266 reset(mockAssembledMessageCallback);
268 final BytesMessage message = new BytesMessage(messageData);
270 try (MessageSlicer slicer = newMessageSlicer(logContext, messageSliceSize)) {
271 final boolean wasSliced = slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(),
272 mockOnFailureCallback);
273 assertTrue(wasSliced);
275 Identifier slicingId = null;
276 int expLastSliceHashCode = SlicedMessageState.INITIAL_SLICE_HASH_CODE;
277 for (int sliceIndex = 1; sliceIndex <= expTotalSlices; sliceIndex++) {
278 final MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
279 slicingId = sliceMessage.getIdentifier();
280 assertMessageSlice(sliceMessage, IDENTIFIER, sliceIndex, expTotalSlices, expLastSliceHashCode,
283 assembler.handleMessage(sliceMessage, sendToProbe.ref());
285 final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
286 assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceIndex);
288 expLastSliceHashCode = Arrays.hashCode(sliceMessage.getData());
290 slicer.handleMessage(reply);
293 assertAssembledMessage(message, replyToProbe.ref());
295 assertFalse("MessageSlicer did not remove state for " + slicingId, slicer.hasState(slicingId));
296 assertFalse("MessageAssembler did not remove state for " + slicingId, assembler.hasState(slicingId));
300 private void assertFailureCallback(final Class<?> exceptionType) {
301 ArgumentCaptor<Throwable> exceptionCaptor = ArgumentCaptor.forClass(Throwable.class);
302 verify(mockOnFailureCallback).accept(exceptionCaptor.capture());
303 assertEquals("Exception type", exceptionType, exceptionCaptor.getValue().getClass());
306 private void assertAssembledMessage(final BytesMessage message, final ActorRef sender) {
307 assertAssembledMessage(mockAssembledMessageCallback, message, sender);
310 static void assertAssembledMessage(final BiConsumer<Object, ActorRef> mockAssembledMessageCallback,
311 final BytesMessage message, final ActorRef sender) {
312 ArgumentCaptor<Object> assembledMessageCaptor = ArgumentCaptor.forClass(Object.class);
313 ArgumentCaptor<ActorRef> senderActorRefCaptor = ArgumentCaptor.forClass(ActorRef.class);
314 verify(mockAssembledMessageCallback).accept(assembledMessageCaptor.capture(), senderActorRefCaptor.capture());
315 assertEquals("Assembled message", message, assembledMessageCaptor.getValue());
316 assertEquals("Sender ActorRef", sender, senderActorRefCaptor.getValue());
319 static void assertSuccessfulMessageSliceReply(final MessageSliceReply reply, final Identifier identifier,
320 final int sliceIndex) {
321 assertEquals("Identifier", identifier, ((MessageSliceIdentifier)reply.getIdentifier())
322 .getClientIdentifier());
323 assertEquals("SliceIndex", sliceIndex, reply.getSliceIndex());
326 static void assertFailedMessageSliceReply(final MessageSliceReply reply, final Identifier identifier,
327 final boolean isRetriable) {
328 assertEquals("Identifier", identifier, ((MessageSliceIdentifier)reply.getIdentifier())
329 .getClientIdentifier());
330 assertEquals("Failure present", Boolean.TRUE, reply.getFailure().isPresent());
331 assertEquals("isRetriable", isRetriable, reply.getFailure().orElseThrow().isRetriable());
334 static void assertMessageSlice(final MessageSlice sliceMessage, final Identifier identifier, final int sliceIndex,
335 final int totalSlices, final int lastSliceHashCode, final ActorRef replyTo) {
336 assertEquals("Identifier", identifier, ((MessageSliceIdentifier)sliceMessage.getIdentifier())
337 .getClientIdentifier());
338 assertEquals("SliceIndex", sliceIndex, sliceMessage.getSliceIndex());
339 assertEquals("LastSliceHashCode", lastSliceHashCode, sliceMessage.getLastSliceHashCode());
340 assertEquals("ReplyTo", replyTo, sliceMessage.getReplyTo());
342 if (totalSlices != DONT_CARE) {
343 assertEquals("TotalSlices", totalSlices, sliceMessage.getTotalSlices());
347 private static MessageSlicer newMessageSlicer(final String logContext, final int messageSliceSize) {
348 return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext)
349 .fileBackedStreamFactory(FILE_BACKED_STREAM_FACTORY).build();