2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import static org.hamcrest.CoreMatchers.everyItem;
11 import static org.hamcrest.MatcherAssert.assertThat;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertFalse;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.ArgumentMatchers.any;
16 import static org.mockito.Mockito.doAnswer;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
21 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
23 import com.google.common.base.Ticker;
24 import com.google.common.collect.Collections2;
25 import com.google.common.collect.ImmutableList;
26 import com.google.common.testing.FakeTicker;
27 import java.util.Arrays;
28 import java.util.Collection;
29 import java.util.List;
30 import java.util.Optional;
31 import java.util.concurrent.TimeUnit;
32 import java.util.function.Consumer;
33 import org.junit.Test;
34 import org.mockito.ArgumentCaptor;
35 import org.opendaylight.controller.cluster.access.ABIVersion;
36 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
37 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
38 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
39 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
40 import org.opendaylight.controller.cluster.access.concepts.Request;
41 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
42 import org.opendaylight.controller.cluster.access.concepts.RequestException;
43 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
44 import org.opendaylight.controller.cluster.access.concepts.Response;
45 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
46 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
47 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
48 import org.opendaylight.controller.cluster.messaging.SliceOptions;
50 public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<TransmitQueue.Transmitting> {
52 private BackendInfo backendInfo;
53 private final MessageSlicer mockMessageSlicer = mock(MessageSlicer.class);
55 private static long now() {
56 return Ticker.systemTicker().read();
60 protected int getMaxInFlightMessages() {
61 return backendInfo.getMaxMessages();
65 protected TransmitQueue.Transmitting createQueue() {
66 doReturn(false).when(mockMessageSlicer).slice(any());
67 backendInfo = new BackendInfo(probe.ref(), "test", 0L, ABIVersion.current(), 3);
68 return new TransmitQueue.Transmitting(new TransmitQueue.Halted(0), 0, backendInfo, now(), mockMessageSlicer);
72 public void testComplete() {
73 final long sequence1 = 0L;
74 final long sequence2 = 1L;
75 final Request<?, ?> request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref());
76 final TransactionIdentifier transactionIdentifier2 = new TransactionIdentifier(HISTORY, 1L);
77 final Request<?, ?> request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
78 final Consumer<Response<?, ?>> callback1 = createConsumerMock();
79 final Consumer<Response<?, ?>> callback2 = createConsumerMock();
80 final long now1 = now();
81 final long now2 = now();
83 queue.enqueueOrForward(new ConnectionEntry(request1, callback1, now1), now1);
84 queue.enqueueOrForward(new ConnectionEntry(request2, callback2, now2), now2);
85 final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1);
86 final RequestSuccess<?, ?> success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2);
87 //complete entries in different order
88 final Optional<TransmittedConnectionEntry> completed2 =
89 queue.complete(new SuccessEnvelope(success2, 0L, sequence2, 1L), now2);
90 final Optional<TransmittedConnectionEntry> completed1 =
91 queue.complete(new SuccessEnvelope(success1, 0L, sequence1, 1L), now1);
93 final TransmittedConnectionEntry transmittedEntry1 = completed1.orElseThrow(AssertionError::new);
94 assertEquals(transmittedEntry1.getRequest(), request1);
95 assertEquals(transmittedEntry1.getTxSequence(), sequence1);
96 assertEquals(transmittedEntry1.getCallback(), callback1);
98 final TransmittedConnectionEntry transmittedEntry2 = completed2.orElseThrow(AssertionError::new);
99 assertEquals(transmittedEntry2.getRequest(), request2);
100 assertEquals(transmittedEntry2.getTxSequence(), sequence2);
101 assertEquals(transmittedEntry2.getCallback(), callback2);
105 public void testEnqueueCanTransmit() {
106 final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
107 final Consumer<Response<?, ?>> callback = createConsumerMock();
108 final long now = now();
109 queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
110 final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
111 assertEquals(request, requestEnvelope.getMessage());
115 public void testEnqueueBackendFull() {
116 final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
117 final Consumer<Response<?, ?>> callback = createConsumerMock();
118 final long now = now();
119 final int sentMessages = getMaxInFlightMessages() + 1;
120 for (int i = 0; i < sentMessages; i++) {
121 queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
123 for (int i = 0; i < getMaxInFlightMessages(); i++) {
124 probe.expectMsgClass(RequestEnvelope.class);
126 probe.expectNoMessage();
127 final Collection<ConnectionEntry> entries = queue.drain();
128 assertEquals(sentMessages, entries.size());
129 assertThat(entries, everyItem(entryWithRequest(request)));
134 public void testCanTransmitCount() {
135 assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0);
136 assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0);
141 public void testTransmit() {
142 final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
143 final Consumer<Response<?, ?>> callback = createConsumerMock();
144 final long now = now();
145 final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
147 Optional<TransmittedConnectionEntry> transmitted = queue.transmit(entry, now);
148 assertTrue(transmitted.isPresent());
149 assertEquals(request, transmitted.orElseThrow().getRequest());
150 assertEquals(callback, transmitted.orElseThrow().getCallback());
152 final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
153 assertEquals(request, requestEnvelope.getMessage());
155 transmitted = queue.transmit(new ConnectionEntry(new TransactionPurgeRequest(
156 TRANSACTION_IDENTIFIER, 1L, probe.ref()), callback, now), now);
157 assertTrue(transmitted.isPresent());
161 public void testSetForwarder() {
162 final FakeTicker ticker = new FakeTicker();
163 ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS);
164 final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
165 final Consumer<Response<?, ?>> callback = createConsumerMock();
166 final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
167 final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
168 queue.setForwarder(forwarder, ticker.read());
169 final long secondEnqueueNow = ticker.read();
170 queue.enqueueOrForward(entry, secondEnqueueNow);
171 verify(forwarder).forwardEntry(entry, secondEnqueueNow);
175 public void testCompleteOrdering() {
176 final Request<?, ?> req0 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
177 final Request<?, ?> req1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
178 final Request<?, ?> req2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 2L, probe.ref());
179 final Request<?, ?> req3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref());
180 final Request<?, ?> req4 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 4L, probe.ref());
181 final Request<?, ?> req5 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 5L, probe.ref());
182 final Request<?, ?> req6 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 6L, probe.ref());
183 final Consumer<Response<?, ?>> callback = createConsumerMock();
185 // Fill the queue up to capacity + 1
186 queue.enqueueOrForward(new ConnectionEntry(req0, callback, 0), 0);
187 queue.enqueueOrForward(new ConnectionEntry(req1, callback, 0), 0);
188 queue.enqueueOrForward(new ConnectionEntry(req2, callback, 0), 0);
189 queue.enqueueOrForward(new ConnectionEntry(req3, callback, 0), 0);
190 assertEqualRequests(queue.getInflight(), req0, req1, req2);
191 assertEqualRequests(queue.getPending(), req3);
193 // Now complete req0, which should transmit req3
194 queue.complete(new FailureEnvelope(req0.toRequestFailure(mock(RequestException.class)), 0, 0, 0), 0);
195 assertEqualRequests(queue.getInflight(), req1, req2, req3);
196 assertEqualRequests(queue.getPending());
198 // Now complete req1, which should leave an empty slot
199 queue.complete(new FailureEnvelope(req1.toRequestFailure(mock(RequestException.class)), 0, 1, 0), 0);
200 assertEqualRequests(queue.getInflight(), req2, req3);
201 assertEqualRequests(queue.getPending());
203 // Enqueue req4, which should be immediately transmitted
204 queue.enqueueOrForward(new ConnectionEntry(req4, callback, 0), 0);
205 assertEqualRequests(queue.getInflight(), req2, req3, req4);
206 assertEqualRequests(queue.getPending());
208 // Enqueue req5, which should move to pending
209 queue.enqueueOrForward(new ConnectionEntry(req5, callback, 0), 0);
210 assertEqualRequests(queue.getInflight(), req2, req3, req4);
211 assertEqualRequests(queue.getPending(), req5);
213 // Remove req4, creating an inconsistency...
214 queue.getInflight().removeLast();
215 assertEqualRequests(queue.getInflight(), req2, req3);
216 assertEqualRequests(queue.getPending(), req5);
218 // ... and enqueue req6, which should cause req5 to be transmitted
219 queue.enqueueOrForward(new ConnectionEntry(req6, callback, 0), 0);
220 assertEqualRequests(queue.getInflight(), req2, req3, req5);
221 assertEqualRequests(queue.getPending(), req6);
225 public void testRequestSlicingOnTransmit() {
226 doReturn(true).when(mockMessageSlicer).slice(any());
228 ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
229 TRANSACTION_IDENTIFIER, probe.ref());
230 reqBuilder.setSequence(0L);
231 final Request<?, ?> request = reqBuilder.build();
233 final long now = now();
234 final Consumer<Response<?, ?>> mockConsumer = createConsumerMock();
235 Optional<TransmittedConnectionEntry> transmitted =
236 queue.transmit(new ConnectionEntry(request, mockConsumer, now), now);
237 assertTrue(transmitted.isPresent());
239 ArgumentCaptor<SliceOptions> sliceOptions = ArgumentCaptor.forClass(SliceOptions.class);
240 verify(mockMessageSlicer).slice(sliceOptions.capture());
241 assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope);
242 RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage();
243 assertEquals(request, requestEnvelope.getMessage());
245 final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
246 transmitted = queue.transmit(new ConnectionEntry(request2, mockConsumer, now), now);
247 assertFalse(transmitted.isPresent());
251 public void testSlicingFailureOnTransmit() {
252 doAnswer(invocation -> {
253 invocation.<SliceOptions>getArgument(0).getOnFailureCallback().accept(new Exception("mock"));
254 return Boolean.FALSE;
255 }).when(mockMessageSlicer).slice(any());
257 ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
258 TRANSACTION_IDENTIFIER, probe.ref());
259 reqBuilder.setSequence(0L);
261 final long now = now();
262 Optional<TransmittedConnectionEntry> transmitted =
263 queue.transmit(new ConnectionEntry(reqBuilder.build(), createConsumerMock(), now), now);
264 assertTrue(transmitted.isPresent());
266 verify(mockMessageSlicer).slice(any());
268 probe.expectMsgClass(FailureEnvelope.class);
272 public void testSlicedRequestOnComplete() {
273 doReturn(true).when(mockMessageSlicer).slice(any());
275 ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
276 TRANSACTION_IDENTIFIER, probe.ref());
277 reqBuilder.setSequence(0L);
278 final Request<?, ?> request = reqBuilder.build();
280 final long now = now();
281 final Consumer<Response<?, ?>> mockConsumer = createConsumerMock();
282 queue.enqueueOrForward(new ConnectionEntry(request, mockConsumer, now), now);
284 ArgumentCaptor<SliceOptions> sliceOptions = ArgumentCaptor.forClass(SliceOptions.class);
285 verify(mockMessageSlicer).slice(sliceOptions.capture());
286 assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope);
288 final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
289 queue.enqueueOrForward(new ConnectionEntry(request2, mockConsumer, now), now);
290 verifyNoMoreInteractions(mockMessageSlicer);
291 probe.expectNoMessage();
293 RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage();
294 queue.complete(new FailureEnvelope(request.toRequestFailure(mock(RequestException.class)),
295 requestEnvelope.getSessionId(), requestEnvelope.getTxSequence(), 0), 0);
297 requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
298 assertEquals(request2, requestEnvelope.getMessage());
300 final Request<?, ?> request3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref());
301 queue.enqueueOrForward(new ConnectionEntry(request3, mockConsumer, now), now);
303 requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
304 assertEquals(request3, requestEnvelope.getMessage());
307 private static void assertEqualRequests(final Collection<? extends ConnectionEntry> queue,
308 final Request<?, ?>... requests) {
309 final List<Request<?, ?>> queued = ImmutableList.copyOf(Collections2.transform(queue,
310 ConnectionEntry::getRequest));
311 assertEquals(Arrays.asList(requests), queued);