Remove unused exceptions
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / test / java / org / opendaylight / controller / cluster / messaging / MessageAssemblerTest.java
1 /*
2  * Copyright (c) 2017 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.messaging;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.anyInt;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.doThrow;
18 import static org.mockito.Mockito.spy;
19 import static org.mockito.Mockito.verify;
20 import static org.opendaylight.controller.cluster.messaging.MessageSlicingIntegrationTest.assertAssembledMessage;
21 import static org.opendaylight.controller.cluster.messaging.MessageSlicingIntegrationTest.assertFailedMessageSliceReply;
22 import static org.opendaylight.controller.cluster.messaging.MessageSlicingIntegrationTest.assertSuccessfulMessageSliceReply;
23
24 import akka.actor.ActorRef;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import java.io.IOException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.BiConsumer;
29 import org.apache.commons.lang3.SerializationUtils;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.mockito.Mock;
33 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
34 import org.opendaylight.controller.cluster.messaging.MessageAssembler.Builder;
35
36 /**
37  * Unit tests for MessageAssembler.
38  *
39  * @author Thomas Pantelis
40  */
41 public class MessageAssemblerTest extends AbstractMessagingTest {
42
43     @Mock
44     private BiConsumer<Object, ActorRef> mockAssembledMessageCallback;
45
46     @Override
47     @Before
48     public void setup() throws IOException {
49         super.setup();
50
51         doNothing().when(mockAssembledMessageCallback).accept(any(Object.class), any(ActorRef.class));
52     }
53
54     @Test
55     public void testHandledMessages() {
56         final MessageSlice messageSlice = new MessageSlice(IDENTIFIER, new byte[0], 1, 1, 1, testProbe.ref());
57         final AbortSlicing abortSlicing = new AbortSlicing(IDENTIFIER);
58         assertEquals("isHandledMessage", Boolean.TRUE, MessageAssembler.isHandledMessage(messageSlice));
59         assertEquals("isHandledMessage", Boolean.TRUE, MessageAssembler.isHandledMessage(abortSlicing));
60         assertEquals("isHandledMessage", Boolean.FALSE, MessageSlicer.isHandledMessage(new Object()));
61
62         try (MessageAssembler assembler = newMessageAssembler("testHandledMessages")) {
63             assertEquals("handledMessage", Boolean.TRUE, assembler.handleMessage(messageSlice, testProbe.ref()));
64             assertEquals("handledMessage", Boolean.TRUE, assembler.handleMessage(abortSlicing, testProbe.ref()));
65             assertEquals("handledMessage", Boolean.FALSE, assembler.handleMessage(new Object(), testProbe.ref()));
66         }
67     }
68
69     @Test
70     public void testSingleMessageSlice() {
71         try (MessageAssembler assembler = newMessageAssembler("testSingleMessageSlice")) {
72             final FileBackedOutputStream fileBackStream = spy(new FileBackedOutputStream(100000000, null));
73             doReturn(fileBackStream).when(mockFiledBackedStreamFactory).newInstance();
74
75             final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
76             final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
77
78             final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
79                     SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref());
80             assembler.handleMessage(messageSlice, testProbe.ref());
81
82             final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
83             assertSuccessfulMessageSliceReply(reply, IDENTIFIER, 1);
84
85             assertAssembledMessage(mockAssembledMessageCallback, message, testProbe.ref());
86
87             assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier));
88             verify(fileBackStream).cleanup();
89         }
90     }
91
92     @Test
93     public void testMessageSliceWithByteSourceFailure() throws IOException {
94         try (MessageAssembler assembler = newMessageAssembler("testMessageSliceWithByteSourceFailure")) {
95             IOException mockFailure = new IOException("mock IOException");
96             doThrow(mockFailure).when(mockByteSource).openStream();
97             doThrow(mockFailure).when(mockByteSource).openBufferedStream();
98
99             final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
100             final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
101
102             final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
103                     SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref());
104             assembler.handleMessage(messageSlice, testProbe.ref());
105
106             final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
107             assertFailedMessageSliceReply(reply, IDENTIFIER, false);
108             assertEquals("Failure cause", mockFailure, reply.getFailure().get().getCause());
109
110             assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier));
111             verify(mockFiledBackedStream).cleanup();
112         }
113     }
114
115     @Test
116     public void testMessageSliceWithStreamWriteFailure() throws IOException {
117         try (MessageAssembler assembler = newMessageAssembler("testMessageSliceWithStreamWriteFailure")) {
118             IOException mockFailure = new IOException("mock IOException");
119             doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class), anyInt(), anyInt());
120             doThrow(mockFailure).when(mockFiledBackedStream).write(any(byte[].class));
121             doThrow(mockFailure).when(mockFiledBackedStream).write(anyInt());
122             doThrow(mockFailure).when(mockFiledBackedStream).flush();
123
124             final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
125             final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
126
127             final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 1,
128                     SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref());
129             assembler.handleMessage(messageSlice, testProbe.ref());
130
131             final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
132             assertFailedMessageSliceReply(reply, IDENTIFIER, false);
133             assertEquals("Failure cause", mockFailure, reply.getFailure().get().getCause());
134
135             assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier));
136             verify(mockFiledBackedStream).cleanup();
137         }
138     }
139
140     @Test
141     public void testAssembledMessageStateExpiration() {
142         final int expiryDuration = 200;
143         try (MessageAssembler assembler = newMessageAssemblerBuilder("testAssembledMessageStateExpiration")
144                 .expireStateAfterInactivity(expiryDuration, TimeUnit.MILLISECONDS).build()) {
145             final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
146             final BytesMessage message = new BytesMessage(new byte[]{1, 2, 3});
147
148             final MessageSlice messageSlice = new MessageSlice(identifier, SerializationUtils.serialize(message), 1, 2,
149                     SlicedMessageState.INITIAL_SLICE_HASH_CODE, testProbe.ref());
150             assembler.handleMessage(messageSlice, testProbe.ref());
151
152             final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
153             assertSuccessfulMessageSliceReply(reply, IDENTIFIER, 1);
154
155             assertTrue("MessageAssembler should have remove state for " + identifier, assembler.hasState(identifier));
156             Uninterruptibles.sleepUninterruptibly(expiryDuration + 50, TimeUnit.MILLISECONDS);
157             assertFalse("MessageAssembler did not remove state for " + identifier, assembler.hasState(identifier));
158
159             verify(mockFiledBackedStream).cleanup();
160         }
161     }
162
163     @Test
164     public void testFirstMessageSliceWithInvalidIndex() {
165         try (MessageAssembler assembler = newMessageAssembler("testFirstMessageSliceWithInvalidIndex")) {
166             final MessageSliceIdentifier identifier = new MessageSliceIdentifier(IDENTIFIER, 1);
167             final MessageSlice messageSlice = new MessageSlice(identifier, new byte[0], 2, 3, 1, testProbe.ref());
168             assembler.handleMessage(messageSlice, testProbe.ref());
169
170             final MessageSliceReply reply = testProbe.expectMsgClass(MessageSliceReply.class);
171             assertFailedMessageSliceReply(reply, IDENTIFIER, true);
172             assertFalse("MessageAssembler should not have state for " + identifier, assembler.hasState(identifier));
173         }
174     }
175
176     private MessageAssembler newMessageAssembler(String logContext) {
177         return newMessageAssemblerBuilder(logContext).build();
178     }
179
180     private Builder newMessageAssemblerBuilder(String logContext) {
181         return MessageAssembler.builder().fileBackedStreamFactory(mockFiledBackedStreamFactory)
182                 .assembledMessageCallback(mockAssembledMessageCallback).logContext(logContext);
183     }
184 }