Slice front-end request messages
[controller.git] / opendaylight / md-sal / cds-access-client / src / test / java / org / opendaylight / controller / cluster / access / client / TransmittingTransmitQueueTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import static org.hamcrest.CoreMatchers.everyItem;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertThat;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Mockito.doAnswer;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
21 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
22
23 import com.google.common.base.Ticker;
24 import com.google.common.collect.Collections2;
25 import com.google.common.collect.ImmutableList;
26 import com.google.common.testing.FakeTicker;
27 import java.util.Arrays;
28 import java.util.Collection;
29 import java.util.List;
30 import java.util.Optional;
31 import java.util.concurrent.TimeUnit;
32 import java.util.function.Consumer;
33 import org.junit.Test;
34 import org.mockito.ArgumentCaptor;
35 import org.opendaylight.controller.cluster.access.ABIVersion;
36 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
37 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
38 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
39 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
40 import org.opendaylight.controller.cluster.access.concepts.Request;
41 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
42 import org.opendaylight.controller.cluster.access.concepts.RequestException;
43 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
44 import org.opendaylight.controller.cluster.access.concepts.Response;
45 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
46 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
47 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
48 import org.opendaylight.controller.cluster.messaging.SliceOptions;
49
50 public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<TransmitQueue.Transmitting> {
51
52     private BackendInfo backendInfo;
53     private final MessageSlicer mockMessageSlicer = mock(MessageSlicer.class);
54
55     private static long now() {
56         return Ticker.systemTicker().read();
57     }
58
59     @Override
60     protected int getMaxInFlightMessages() {
61         return backendInfo.getMaxMessages();
62     }
63
64     @Override
65     protected TransmitQueue.Transmitting createQueue() {
66         doReturn(false).when(mockMessageSlicer).slice(any());
67         backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3);
68         return new TransmitQueue.Transmitting(new TransmitQueue.Halted(0), 0, backendInfo, now(), mockMessageSlicer);
69     }
70
71     @Test
72     public void testComplete() throws Exception {
73         final long sequence1 = 0L;
74         final long sequence2 = 1L;
75         final Request<?, ?> request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref());
76         final TransactionIdentifier transactionIdentifier2 = new TransactionIdentifier(HISTORY, 1L);
77         final Request<?, ?> request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
78         final Consumer<Response<?, ?>> callback1 = createConsumerMock();
79         final Consumer<Response<?, ?>> callback2 = createConsumerMock();
80         final long now1 = now();
81         final long now2 = now();
82         //enqueue 2 entries
83         queue.enqueueOrForward(new ConnectionEntry(request1, callback1, now1), now1);
84         queue.enqueueOrForward(new ConnectionEntry(request2, callback2, now2), now2);
85         final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1);
86         final RequestSuccess<?, ?> success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2);
87         //complete entries in different order
88         final Optional<TransmittedConnectionEntry> completed2 =
89                 queue.complete(new SuccessEnvelope(success2, 0L, sequence2, 1L), now2);
90         final Optional<TransmittedConnectionEntry> completed1 =
91                 queue.complete(new SuccessEnvelope(success1, 0L, sequence1, 1L), now1);
92         //check first entry
93         final TransmittedConnectionEntry transmittedEntry1 = completed1.orElseThrow(AssertionError::new);
94         assertEquals(transmittedEntry1.getRequest(), request1);
95         assertEquals(transmittedEntry1.getTxSequence(), sequence1);
96         assertEquals(transmittedEntry1.getCallback(), callback1);
97         //check second entry
98         final TransmittedConnectionEntry transmittedEntry2 = completed2.orElseThrow(AssertionError::new);
99         assertEquals(transmittedEntry2.getRequest(), request2);
100         assertEquals(transmittedEntry2.getTxSequence(), sequence2);
101         assertEquals(transmittedEntry2.getCallback(), callback2);
102     }
103
104     @Test
105     public void testEnqueueCanTransmit() throws Exception {
106         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
107         final Consumer<Response<?, ?>> callback = createConsumerMock();
108         final long now = now();
109         queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
110         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
111         assertEquals(request, requestEnvelope.getMessage());
112     }
113
114     @Test
115     public void testEnqueueBackendFull() throws Exception {
116         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
117         final Consumer<Response<?, ?>> callback = createConsumerMock();
118         final long now = now();
119         final int sentMessages = getMaxInFlightMessages() + 1;
120         for (int i = 0; i < sentMessages; i++) {
121             queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
122         }
123         for (int i = 0; i < getMaxInFlightMessages(); i++) {
124             probe.expectMsgClass(RequestEnvelope.class);
125         }
126         probe.expectNoMsg();
127         final Collection<ConnectionEntry> entries = queue.drain();
128         assertEquals(sentMessages, entries.size());
129         assertThat(entries, everyItem(entryWithRequest(request)));
130     }
131
132     @Test
133     @Override
134     public void testCanTransmitCount() throws Exception {
135         assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0);
136         assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0);
137     }
138
139     @Test
140     @Override
141     public void testTransmit() throws Exception {
142         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
143         final Consumer<Response<?, ?>> callback = createConsumerMock();
144         final long now = now();
145         final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
146
147         Optional<TransmittedConnectionEntry> transmitted = queue.transmit(entry, now);
148         assertTrue(transmitted.isPresent());
149         assertEquals(request, transmitted.get().getRequest());
150         assertEquals(callback, transmitted.get().getCallback());
151
152         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
153         assertEquals(request, requestEnvelope.getMessage());
154
155         transmitted = queue.transmit(new ConnectionEntry(new TransactionPurgeRequest(
156                 TRANSACTION_IDENTIFIER, 1L, probe.ref()), callback, now), now);
157         assertTrue(transmitted.isPresent());
158     }
159
160     @Test
161     public void testSetForwarder() throws Exception {
162         final FakeTicker ticker = new FakeTicker();
163         ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS);
164         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
165         final Consumer<Response<?, ?>> callback = createConsumerMock();
166         final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
167         final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
168         queue.setForwarder(forwarder, ticker.read());
169         final long secondEnqueueNow = ticker.read();
170         queue.enqueueOrForward(entry, secondEnqueueNow);
171         verify(forwarder).forwardEntry(entry, secondEnqueueNow);
172     }
173
174     @Test
175     public void testCompleteOrdering() {
176         final Request<?, ?> req0 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
177         final Request<?, ?> req1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
178         final Request<?, ?> req2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 2L, probe.ref());
179         final Request<?, ?> req3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref());
180         final Request<?, ?> req4 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 4L, probe.ref());
181         final Request<?, ?> req5 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 5L, probe.ref());
182         final Request<?, ?> req6 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 6L, probe.ref());
183         final Consumer<Response<?, ?>> callback = createConsumerMock();
184
185         // Fill the queue up to capacity + 1
186         queue.enqueueOrForward(new ConnectionEntry(req0, callback, 0), 0);
187         queue.enqueueOrForward(new ConnectionEntry(req1, callback, 0), 0);
188         queue.enqueueOrForward(new ConnectionEntry(req2, callback, 0), 0);
189         queue.enqueueOrForward(new ConnectionEntry(req3, callback, 0), 0);
190         assertEqualRequests(queue.getInflight(), req0, req1, req2);
191         assertEqualRequests(queue.getPending(), req3);
192
193         // Now complete req0, which should transmit req3
194         queue.complete(new FailureEnvelope(req0.toRequestFailure(mock(RequestException.class)), 0, 0, 0), 0);
195         assertEqualRequests(queue.getInflight(), req1, req2, req3);
196         assertEqualRequests(queue.getPending());
197
198         // Now complete req1, which should leave an empty slot
199         queue.complete(new FailureEnvelope(req1.toRequestFailure(mock(RequestException.class)), 0, 1, 0), 0);
200         assertEqualRequests(queue.getInflight(), req2, req3);
201         assertEqualRequests(queue.getPending());
202
203         // Enqueue req4, which should be immediately transmitted
204         queue.enqueueOrForward(new ConnectionEntry(req4, callback, 0), 0);
205         assertEqualRequests(queue.getInflight(), req2, req3, req4);
206         assertEqualRequests(queue.getPending());
207
208         // Enqueue req5, which should move to pending
209         queue.enqueueOrForward(new ConnectionEntry(req5, callback, 0), 0);
210         assertEqualRequests(queue.getInflight(), req2, req3, req4);
211         assertEqualRequests(queue.getPending(), req5);
212
213         // Remove req4, creating an inconsistency...
214         queue.getInflight().removeLast();
215         assertEqualRequests(queue.getInflight(), req2, req3);
216         assertEqualRequests(queue.getPending(), req5);
217
218         // ... and enqueue req6, which should cause req5 to be transmitted
219         queue.enqueueOrForward(new ConnectionEntry(req6, callback, 0), 0);
220         assertEqualRequests(queue.getInflight(), req2, req3, req5);
221         assertEqualRequests(queue.getPending(), req6);
222     }
223
224     @Test
225     public void testRequestSlicingOnTransmit() throws Exception {
226         doReturn(true).when(mockMessageSlicer).slice(any());
227
228         ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
229                 TRANSACTION_IDENTIFIER, probe.ref());
230         reqBuilder.setSequence(0L);
231         final Request<?, ?> request = reqBuilder.build();
232
233         final long now = now();
234         final Consumer<Response<?, ?>> mockConsumer = createConsumerMock();
235         Optional<TransmittedConnectionEntry> transmitted =
236                 queue.transmit(new ConnectionEntry(request, mockConsumer, now), now);
237         assertTrue(transmitted.isPresent());
238
239         ArgumentCaptor<SliceOptions> sliceOptions = ArgumentCaptor.forClass(SliceOptions.class);
240         verify(mockMessageSlicer).slice(sliceOptions.capture());
241         assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope);
242         RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage();
243         assertEquals(request, requestEnvelope.getMessage());
244
245         final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
246         transmitted = queue.transmit(new ConnectionEntry(request2, mockConsumer, now), now);
247         assertFalse(transmitted.isPresent());
248     }
249
250     @Test
251     public void testSlicingFailureOnTransmit() throws Exception {
252         doAnswer(invocation -> {
253             invocation.getArgumentAt(0, SliceOptions.class).getOnFailureCallback().accept(new Exception("mock"));
254             return Boolean.FALSE;
255         }).when(mockMessageSlicer).slice(any());
256
257         ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
258                 TRANSACTION_IDENTIFIER, probe.ref());
259         reqBuilder.setSequence(0L);
260
261         final long now = now();
262         Optional<TransmittedConnectionEntry> transmitted =
263                 queue.transmit(new ConnectionEntry(reqBuilder.build(), createConsumerMock(), now), now);
264         assertTrue(transmitted.isPresent());
265
266         verify(mockMessageSlicer).slice(any());
267
268         probe.expectMsgClass(FailureEnvelope.class);
269     }
270
271     @Test
272     public void testSlicedRequestOnComplete() throws Exception {
273         doReturn(true).when(mockMessageSlicer).slice(any());
274
275         ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
276                 TRANSACTION_IDENTIFIER, probe.ref());
277         reqBuilder.setSequence(0L);
278         final Request<?, ?> request = reqBuilder.build();
279
280         final long now = now();
281         final Consumer<Response<?, ?>> mockConsumer = createConsumerMock();
282         queue.enqueueOrForward(new ConnectionEntry(request, mockConsumer, now), now);
283
284         ArgumentCaptor<SliceOptions> sliceOptions = ArgumentCaptor.forClass(SliceOptions.class);
285         verify(mockMessageSlicer).slice(sliceOptions.capture());
286         assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope);
287
288         final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
289         queue.enqueueOrForward(new ConnectionEntry(request2, mockConsumer, now), now);
290         verifyNoMoreInteractions(mockMessageSlicer);
291         probe.expectNoMsg();
292
293         RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage();
294         queue.complete(new FailureEnvelope(request.toRequestFailure(mock(RequestException.class)),
295                 requestEnvelope.getSessionId(), requestEnvelope.getTxSequence(), 0), 0);
296
297         requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
298         assertEquals(request2, requestEnvelope.getMessage());
299
300         final Request<?, ?> request3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref());
301         queue.enqueueOrForward(new ConnectionEntry(request3, mockConsumer, now), now);
302
303         requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
304         assertEquals(request3, requestEnvelope.getMessage());
305     }
306
307     private static void assertEqualRequests(final Collection<? extends ConnectionEntry> queue,
308             final Request<?, ?>... requests) {
309         final List<Request<?, ?>> queued = ImmutableList.copyOf(Collections2.transform(queue,
310             ConnectionEntry::getRequest));
311         assertEquals(Arrays.asList(requests), queued);
312     }
313 }