2 * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.messaging;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.anyInt;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.doThrow;
18 import static org.mockito.Mockito.spy;
19 import static org.mockito.Mockito.verify;
20 import static org.opendaylight.controller.cluster.messaging.MessageSlicingIntegrationTest.assertAssembledMessage;
21 import static org.opendaylight.controller.cluster.messaging.MessageSlicingIntegrationTest.assertFailedMessageSliceReply;
22 import static org.opendaylight.controller.cluster.messaging.MessageSlicingIntegrationTest.assertSuccessfulMessageSliceReply;
24 import akka.actor.ActorRef;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import java.io.IOException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.BiConsumer;
29 import org.apache.commons.lang3.SerializationUtils;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.mockito.Mock;
33 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
34 import org.opendaylight.controller.cluster.messaging.MessageAssembler.Builder;
37 * Unit tests for MessageAssembler.
39 * @author Thomas Pantelis
41 public class MessageAssemblerTest extends AbstractMessagingTest {
44 private BiConsumer<Object, ActorRef> mockAssembledMessageCallback;
48 public void setup() throws IOException {
51 doNothing().when(mockAssembledMessageCallback).accept(any(Object.class), any(ActorRef.class));
55 public void testHandledMessages() {
56 final MessageSlice messageSlice = new MessageSlice(IDENTIFIER, new byte[0], 1, 1, 1, testProbe.ref());
57 final AbortSlicing abortSlicing = new AbortSlicing(IDENTIFIER);
58 assertEquals("isHandledMessage", Boolean.TRUE, MessageAssembler.isHandledMessage(messageSlice));
59 assertEquals("isHandledMessage", Boolean.TRUE, MessageAssembler.isHandledMessage(abortSlicing));
60 assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
62 try (MessageAssembler assembler = newMessageAssembler("testHandledMessages")) {
63 assertEquals("handledMessage", Boolean.TRUE, assembler.handleMessage(messageSlice, testProbe.ref()));
64 assertEquals("handledMessage", Boolean.TRUE, assembler.handleMessage(abortSlicing, testProbe.ref()));
65 assertEquals("handledMessage", Boolean.FALSE, assembler.handleMessage(new Object(), testProbe.ref()));
70 public void testSingleMessageSlice() {
71 try (MessageAssembler assembler = newMessageAssembler("testSingleMessageSlice")) {
72 final FileBackedOutputStream fileBackStream = spy(new FileBackedOutputStream(100000000, null));
73 doReturn(fileBackStream).when(mockFiledBackedStreamFactory).newInstance();
75 final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
76 final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
78 final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
79 SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref());
80 assembler.handleMessage(messageSlice, testProbe.ref());
82 final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
83 assertSuccessfulMessageSliceReply(reply, IDENTIFIER, 1);
85 assertAssembledMessage(mockAssembledMessageCallback, message, testProbe.ref());
87 assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier));
88 verify(fileBackStream).cleanup();
93 public void testMessageSliceWithByteSourceFailure() throws IOException {
94 try (MessageAssembler assembler = newMessageAssembler("testMessageSliceWithByteSourceFailure")) {
95 IOException mockFailure = new IOException("mock IOException");
96 doThrow(mockFailure).when(mockByteSource).openStream();
97 doThrow(mockFailure).when(mockByteSource).openBufferedStream();
99 final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
100 final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
102 final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
103 SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref());
104 assembler.handleMessage(messageSlice, testProbe.ref());
106 final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
107 assertFailedMessageSliceReply(reply, IDENTIFIER, false);
108 assertEquals("Failure cause", mockFailure, reply.getFailure().get().getCause());
110 assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier));
111 verify(mockFiledBackedStream).cleanup();
116 public void testMessageSliceWithStreamWriteFailure() throws IOException {
117 try (MessageAssembler assembler = newMessageAssembler("testMessageSliceWithStreamWriteFailure")) {
118 IOException mockFailure = new IOException("mock IOException");
119 doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt());
120 doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class));
121 doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt());
122 doThrow(mockFailure).when(mockFiledBackedStream).flush();
124 final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
125 final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
127 final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
128 SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref());
129 assembler.handleMessage(messageSlice, testProbe.ref());
131 final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
132 assertFailedMessageSliceReply(reply, IDENTIFIER, false);
133 assertEquals("Failure cause", mockFailure, reply.getFailure().get().getCause());
135 assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier));
136 verify(mockFiledBackedStream).cleanup();
141 public void testAssembledMessageStateExpiration() throws IOException {
142 final int expiryDuration = 200;
143 try (MessageAssembler assembler = newMessageAssemblerBuilder("testAssembledMessageStateExpiration")
144 .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
145 final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
146 final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
148 final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 2,
149 SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref());
150 assembler.handleMessage(messageSlice, testProbe.ref());
152 final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
153 assertSuccessfulMessageSliceReply(reply, IDENTIFIER, 1);
155 assertTrue("MessageAssembler should have remove state for " + identifier, assembler.hasState(identifier));
156 Uninterruptibles.sleepUninterruptibly(expiryDuration + 50, TimeUnit.MILLISECONDS);
157 assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier));
159 verify(mockFiledBackedStream).cleanup();
164 public void testFirstMessageSliceWithInvalidIndex() {
165 try (MessageAssembler assembler = newMessageAssembler("testFirstMessageSliceWithInvalidIndex")) {
166 final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER);
167 final MessageSlice messageSlice = new MessageSlice(identifier, new byte[0], 2, 3, 1, testProbe.ref());
168 assembler.handleMessage(messageSlice, testProbe.ref());
170 final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
171 assertFailedMessageSliceReply(reply, IDENTIFIER, true);
172 assertFalse("MessageAssembler should not have state for " + identifier, assembler.hasState(identifier));
176 private MessageAssembler newMessageAssembler(String logContext) {
177 return newMessageAssemblerBuilder(logContext).build();
180 private Builder newMessageAssemblerBuilder(String logContext) {
181 return MessageAssembler.builder().filedBackedStreamFactory(mockFiledBackedStreamFactory)
182 .assembledMessageCallback(mockAssembledMessageCallback).logContext(logContext);