2 * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.messaging;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.doNothing;
14 import static org.mockito.Mockito.mock;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.verify;
17 import static org.opendaylight.controller.cluster.messaging.MessageSlicerTest.slice;
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSystem;
21 import akka.testkit.JavaTestKit;
22 import akka.testkit.TestProbe;
23 import java.io.ByteArrayOutputStream;
24 import java.io.IOException;
25 import java.io.ObjectOutputStream;
26 import java.util.Arrays;
27 import java.util.function.BiConsumer;
28 import java.util.function.Consumer;
29 import org.apache.commons.lang3.SerializationUtils;
30 import org.junit.After;
31 import org.junit.AfterClass;
32 import org.junit.Before;
33 import org.junit.Test;
34 import org.mockito.ArgumentCaptor;
35 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
36 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
37 import org.opendaylight.yangtools.concepts.Identifier;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * End-to-end integration tests for message slicing.
44 * @author Thomas Pantelis
46 public class MessageSlicingIntegrationTest {
47 private static final Logger LOG = LoggerFactory.getLogger(MessageSlicingIntegrationTest.class);
49 private static final ActorSystem ACTOR_SYSTEM = ActorSystem.create("test");
50 private static final FileBackedOutputStreamFactory FILE_BACKED_STREAM_FACTORY =
51 new FileBackedOutputStreamFactory(1000000000, "target");
52 private static final Identifier IDENTIFIER = new StringIdentifier("stringId");
53 private static final int DONT_CARE = -1;
55 private final TestProbe sendToProbe = TestProbe.apply(ACTOR_SYSTEM);
56 private final TestProbe replyToProbe = TestProbe.apply(ACTOR_SYSTEM);
58 @SuppressWarnings("unchecked")
59 private final Consumer<Throwable> mockOnFailureCallback = mock(Consumer.class);
61 @SuppressWarnings("unchecked")
62 private final BiConsumer<Object, ActorRef> mockAssembledMessageCallback = mock(BiConsumer.class);
64 private final MessageAssembler assembler = MessageAssembler.builder()
65 .assembledMessageCallback(mockAssembledMessageCallback).logContext("test")
66 .fileBackedStreamFactory(FILE_BACKED_STREAM_FACTORY).build();
70 doNothing().when(mockOnFailureCallback).accept(any(Throwable.class));
71 doNothing().when(mockAssembledMessageCallback).accept(any(Object.class), any(ActorRef.class));
75 public void tearDown() {
80 public static void staticTearDown() {
81 JavaTestKit.shutdownActorSystem(ACTOR_SYSTEM, Boolean.TRUE);
85 public void testSlicingWithChunks() throws IOException {
86 LOG.info("testSlicingWithChunks starting");
88 // First slice a message where the messageSliceSize divides evenly into the serialized size.
90 byte[] emptyMessageBytes = SerializationUtils.serialize(new BytesMessage(new byte[]{}));
91 int messageSliceSize = 10;
92 int expTotalSlices = emptyMessageBytes.length / messageSliceSize;
93 ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
94 if (emptyMessageBytes.length % messageSliceSize > 0) {
96 int padding = messageSliceSize - emptyMessageBytes.length % messageSliceSize;
98 for (int i = 0; i < padding; i++, value++) {
99 byteStream.write(value);
103 testSlicing("testSlicingWithChunks", messageSliceSize, expTotalSlices, byteStream.toByteArray());
105 // Now slice a message where the messageSliceSize doesn't divide evenly.
107 byteStream.write(new byte[]{100, 101, 102});
108 testSlicing("testSlicingWithChunks", messageSliceSize, expTotalSlices + 1, byteStream.toByteArray());
110 LOG.info("testSlicingWithChunks ending");
114 public void testSingleSlice() {
115 LOG.info("testSingleSlice starting");
117 // Slice a message where the serialized size is equal to the messageSliceSize. In this case it should
118 // just send the original message.
120 final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
121 try (MessageSlicer slicer = newMessageSlicer("testSingleSlice", SerializationUtils.serialize(message).length)) {
122 slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
124 final BytesMessage sentMessage = sendToProbe.expectMsgClass(BytesMessage.class);
125 assertEquals("Sent message", message, sentMessage);
128 LOG.info("testSingleSlice ending");
132 public void testSlicingWithRetry() {
133 LOG.info("testSlicingWithRetry starting");
135 final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
136 final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
137 try (MessageSlicer slicer = newMessageSlicer("testSlicingWithRetry", messageSliceSize)) {
138 slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
140 MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
141 assembler.handleMessage(sliceMessage, sendToProbe.ref());
143 // Swallow the reply and send the MessageSlice again - it should return a failed reply.
144 replyToProbe.expectMsgClass(MessageSliceReply.class);
145 assembler.handleMessage(sliceMessage, sendToProbe.ref());
147 final MessageSliceReply failedReply = replyToProbe.expectMsgClass(MessageSliceReply.class);
148 assertFailedMessageSliceReply(failedReply, IDENTIFIER, true);
150 // Send the failed reply - slicing should be retried from the beginning.
152 slicer.handleMessage(failedReply);
154 sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
155 assembler.handleMessage(sliceMessage, sendToProbe.ref());
157 final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
158 assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceMessage.getSliceIndex());
159 slicer.handleMessage(reply);
161 if (reply.getSliceIndex() == sliceMessage.getTotalSlices()) {
166 assertAssembledMessage(message, replyToProbe.ref());
169 LOG.info("testSlicingWithRetry ending");
173 public void testSlicingWithMaxRetriesReached() {
174 LOG.info("testSlicingWithMaxRetriesReached starting");
176 final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
177 final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
178 try (MessageSlicer slicer = newMessageSlicer("testSlicingWithMaxRetriesReached", messageSliceSize)) {
179 slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
181 Identifier slicingId = null;
182 for (int i = 0; i < MessageSlicer.DEFAULT_MAX_SLICING_TRIES; i++) {
183 MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
184 slicingId = sliceMessage.getIdentifier();
185 assertMessageSlice(sliceMessage, IDENTIFIER, 1, DONT_CARE, SlicedMessageState.INITIAL_SLICE_HASH_CODE,
187 assembler.handleMessage(sliceMessage, sendToProbe.ref());
189 // Swallow the reply and send the MessageSlicer a reply with an invalid index.
190 final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
191 assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceMessage.getSliceIndex());
192 slicer.handleMessage(MessageSliceReply.success(reply.getIdentifier(), 100000, reply.getSendTo()));
194 final AbortSlicing abortSlicing = sendToProbe.expectMsgClass(AbortSlicing.class);
195 assertEquals("Identifier", slicingId, abortSlicing.getIdentifier());
196 assembler.handleMessage(abortSlicing, sendToProbe.ref());
199 slicer.handleMessage(MessageSliceReply.success(slicingId, 100000, sendToProbe.ref()));
201 assertFailureCallback(RuntimeException.class);
203 assertFalse("MessageSlicer did not remove state for " + slicingId, slicer.hasState(slicingId));
204 assertFalse("MessageAssembler did not remove state for " + slicingId, assembler.hasState(slicingId));
207 LOG.info("testSlicingWithMaxRetriesReached ending");
211 public void testSlicingWithFailure() {
212 LOG.info("testSlicingWithFailure starting");
214 final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
215 final int messageSliceSize = SerializationUtils.serialize(message).length / 2;
216 try (MessageSlicer slicer = newMessageSlicer("testSlicingWithFailure", messageSliceSize)) {
217 slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
219 MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
221 MessageSliceException failure = new MessageSliceException("mock failure",
222 new IOException("mock IOException"));
223 slicer.handleMessage(MessageSliceReply.failed(sliceMessage.getIdentifier(), failure, sendToProbe.ref()));
225 assertFailureCallback(IOException.class);
227 assertFalse("MessageSlicer did not remove state for " + sliceMessage.getIdentifier(),
228 slicer.hasState(sliceMessage.getIdentifier()));
231 LOG.info("testSlicingWithFailure ending");
235 public void testSliceWithFileBackedOutputStream() throws IOException {
236 LOG.info("testSliceWithFileBackedOutputStream starting");
238 final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
239 FileBackedOutputStream fileBackedStream = FILE_BACKED_STREAM_FACTORY.newInstance();
240 try (ObjectOutputStream out = new ObjectOutputStream(fileBackedStream)) {
241 out.writeObject(message);
244 try (MessageSlicer slicer = newMessageSlicer("testSliceWithFileBackedOutputStream",
245 SerializationUtils.serialize(message).length)) {
246 slicer.slice(SliceOptions.builder().identifier(IDENTIFIER).fileBackedOutputStream(fileBackedStream)
247 .sendTo(ACTOR_SYSTEM.actorSelection(sendToProbe.ref().path())).replyTo(replyToProbe.ref())
248 .onFailureCallback(mockOnFailureCallback).build());
250 final MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
251 assembler.handleMessage(sliceMessage, sendToProbe.ref());
252 assertAssembledMessage(message, replyToProbe.ref());
255 LOG.info("testSliceWithFileBackedOutputStream ending");
258 @SuppressWarnings("unchecked")
259 private void testSlicing(String logContext, int messageSliceSize, int expTotalSlices, byte[] messageData) {
260 reset(mockAssembledMessageCallback);
262 final BytesMessage message = new BytesMessage(messageData);
264 try (MessageSlicer slicer = newMessageSlicer(logContext, messageSliceSize)) {
265 slice(slicer, IDENTIFIER, message, sendToProbe.ref(), replyToProbe.ref(), mockOnFailureCallback);
267 Identifier slicingId = null;
268 int expLastSliceHashCode = SlicedMessageState.INITIAL_SLICE_HASH_CODE;
269 for (int sliceIndex = 1; sliceIndex <= expTotalSlices; sliceIndex++) {
270 final MessageSlice sliceMessage = sendToProbe.expectMsgClass(MessageSlice.class);
271 slicingId = sliceMessage.getIdentifier();
272 assertMessageSlice(sliceMessage, IDENTIFIER, sliceIndex, expTotalSlices, expLastSliceHashCode,
275 assembler.handleMessage(sliceMessage, sendToProbe.ref());
277 final MessageSliceReply reply = replyToProbe.expectMsgClass(MessageSliceReply.class);
278 assertSuccessfulMessageSliceReply(reply, IDENTIFIER, sliceIndex);
280 expLastSliceHashCode = Arrays.hashCode(sliceMessage.getData());
282 slicer.handleMessage(reply);
285 assertAssembledMessage(message, replyToProbe.ref());
287 assertFalse("MessageSlicer did not remove state for " + slicingId, slicer.hasState(slicingId));
288 assertFalse("MessageAssembler did not remove state for " + slicingId, assembler.hasState(slicingId));
292 private void assertFailureCallback(final Class<?> exceptionType) {
293 ArgumentCaptor<Throwable> exceptionCaptor = ArgumentCaptor.forClass(Throwable.class);
294 verify(mockOnFailureCallback).accept(exceptionCaptor.capture());
295 assertEquals("Exception type", exceptionType, exceptionCaptor.getValue().getClass());
298 private void assertAssembledMessage(final BytesMessage message, final ActorRef sender) {
299 assertAssembledMessage(mockAssembledMessageCallback, message, sender);
302 static void assertAssembledMessage(final BiConsumer<Object, ActorRef> mockAssembledMessageCallback,
303 final BytesMessage message, final ActorRef sender) {
304 ArgumentCaptor<Object> assembledMessageCaptor = ArgumentCaptor.forClass(Object.class);
305 ArgumentCaptor<ActorRef> senderActorRefCaptor = ArgumentCaptor.forClass(ActorRef.class);
306 verify(mockAssembledMessageCallback).accept(assembledMessageCaptor.capture(), senderActorRefCaptor.capture());
307 assertEquals("Assembled message", message, assembledMessageCaptor.getValue());
308 assertEquals("Sender ActorRef", sender, senderActorRefCaptor.getValue());
311 static void assertSuccessfulMessageSliceReply(MessageSliceReply reply, Identifier identifier, int sliceIndex) {
312 assertEquals("Identifier", identifier, ((MessageSliceIdentifier)reply.getIdentifier())
313 .getClientIdentifier());
314 assertEquals("SliceIndex", sliceIndex, reply.getSliceIndex());
317 static void assertFailedMessageSliceReply(MessageSliceReply reply, Identifier identifier, boolean isRetriable) {
318 assertEquals("Identifier", identifier, ((MessageSliceIdentifier)reply.getIdentifier())
319 .getClientIdentifier());
320 assertEquals("Failure present", Boolean.TRUE, reply.getFailure().isPresent());
321 assertEquals("isRetriable", isRetriable, reply.getFailure().get().isRetriable());
324 static void assertMessageSlice(MessageSlice sliceMessage, Identifier identifier, int sliceIndex, int totalSlices,
325 int lastSliceHashCode, ActorRef replyTo) {
326 assertEquals("Identifier", identifier, ((MessageSliceIdentifier)sliceMessage.getIdentifier())
327 .getClientIdentifier());
328 assertEquals("SliceIndex", sliceIndex, sliceMessage.getSliceIndex());
329 assertEquals("LastSliceHashCode", lastSliceHashCode, sliceMessage.getLastSliceHashCode());
330 assertEquals("ReplyTo", replyTo, sliceMessage.getReplyTo());
332 if (totalSlices != DONT_CARE) {
333 assertEquals("TotalSlices", totalSlices, sliceMessage.getTotalSlices());
337 private MessageSlicer newMessageSlicer(String logContext, final int messageSliceSize) {
338 return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext)
339 .fileBackedStreamFactory(FILE_BACKED_STREAM_FACTORY).build();