BUG-8619: do not touch forward path during purge enqueue
[controller.git] / opendaylight / md-sal / cds-access-client / src / test / java / org / opendaylight / controller / cluster / access / client / TransmittingTransmitQueueTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import static org.hamcrest.CoreMatchers.everyItem;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertThat;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.mock;
16 import static org.mockito.Mockito.verify;
17 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
18
19 import com.google.common.base.Ticker;
20 import com.google.common.collect.Collections2;
21 import com.google.common.collect.ImmutableList;
22 import com.google.common.testing.FakeTicker;
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.Optional;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Consumer;
29 import org.junit.Test;
30 import org.opendaylight.controller.cluster.access.ABIVersion;
31 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
33 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
34 import org.opendaylight.controller.cluster.access.concepts.Request;
35 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
36 import org.opendaylight.controller.cluster.access.concepts.RequestException;
37 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
38 import org.opendaylight.controller.cluster.access.concepts.Response;
39 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
40 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
41
42 public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<TransmitQueue.Transmitting> {
43
44     private BackendInfo backendInfo;
45
46     private static long now() {
47         return Ticker.systemTicker().read();
48     }
49
50     @Override
51     protected int getMaxInFlightMessages() {
52         return backendInfo.getMaxMessages();
53     }
54
55     @Override
56     protected TransmitQueue.Transmitting createQueue() {
57         backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3);
58         return new TransmitQueue.Transmitting(new TransmitQueue.Halted(0), 0, backendInfo, now());
59     }
60
61     @Test
62     public void testComplete() throws Exception {
63         final long sequence1 = 0L;
64         final long sequence2 = 1L;
65         final Request<?, ?> request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref());
66         final TransactionIdentifier transactionIdentifier2 = new TransactionIdentifier(HISTORY, 1L);
67         final Request<?, ?> request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
68         final Consumer<Response<?, ?>> callback1 = createConsumerMock();
69         final Consumer<Response<?, ?>> callback2 = createConsumerMock();
70         final long now1 = now();
71         final long now2 = now();
72         //enqueue 2 entries
73         queue.enqueueOrForward(new ConnectionEntry(request1, callback1, now1), now1);
74         queue.enqueueOrForward(new ConnectionEntry(request2, callback2, now2), now2);
75         final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1);
76         final RequestSuccess<?, ?> success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2);
77         //complete entries in different order
78         final Optional<TransmittedConnectionEntry> completed2 =
79                 queue.complete(new SuccessEnvelope(success2, 0L, sequence2, 1L), now2);
80         final Optional<TransmittedConnectionEntry> completed1 =
81                 queue.complete(new SuccessEnvelope(success1, 0L, sequence1, 1L), now1);
82         //check first entry
83         final TransmittedConnectionEntry transmittedEntry1 = completed1.orElseThrow(AssertionError::new);
84         assertEquals(transmittedEntry1.getRequest(), request1);
85         assertEquals(transmittedEntry1.getTxSequence(), sequence1);
86         assertEquals(transmittedEntry1.getCallback(), callback1);
87         //check second entry
88         final TransmittedConnectionEntry transmittedEntry2 = completed2.orElseThrow(AssertionError::new);
89         assertEquals(transmittedEntry2.getRequest(), request2);
90         assertEquals(transmittedEntry2.getTxSequence(), sequence2);
91         assertEquals(transmittedEntry2.getCallback(), callback2);
92     }
93
94     @Test
95     public void testEnqueueCanTransmit() throws Exception {
96         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
97         final Consumer<Response<?, ?>> callback = createConsumerMock();
98         final long now = now();
99         queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
100         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
101         assertEquals(request, requestEnvelope.getMessage());
102     }
103
104     @Test
105     public void testEnqueueBackendFull() throws Exception {
106         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
107         final Consumer<Response<?, ?>> callback = createConsumerMock();
108         final long now = now();
109         final int sentMessages = getMaxInFlightMessages() + 1;
110         for (int i = 0; i < sentMessages; i++) {
111             queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
112         }
113         for (int i = 0; i < getMaxInFlightMessages(); i++) {
114             probe.expectMsgClass(RequestEnvelope.class);
115         }
116         probe.expectNoMsg();
117         final Collection<ConnectionEntry> entries = queue.drain();
118         assertEquals(sentMessages, entries.size());
119         assertThat(entries, everyItem(entryWithRequest(request)));
120     }
121
122     @Test
123     @Override
124     public void testCanTransmitCount() throws Exception {
125         assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0);
126         assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0);
127     }
128
129     @Test
130     @Override
131     public void testTransmit() throws Exception {
132         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
133         final Consumer<Response<?, ?>> callback = createConsumerMock();
134         final long now = now();
135         final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
136         queue.transmit(entry, now);
137         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
138         assertEquals(request, requestEnvelope.getMessage());
139     }
140
141     @Test
142     public void testSetForwarder() throws Exception {
143         final FakeTicker ticker = new FakeTicker();
144         ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS);
145         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
146         final Consumer<Response<?, ?>> callback = createConsumerMock();
147         final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
148         final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
149         queue.setForwarder(forwarder, ticker.read());
150         final long secondEnqueueNow = ticker.read();
151         queue.enqueueOrForward(entry, secondEnqueueNow);
152         verify(forwarder).forwardEntry(entry, secondEnqueueNow);
153     }
154
155     @Test
156     public void testCompleteOrdering() {
157         final Request<?, ?> req0 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
158         final Request<?, ?> req1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
159         final Request<?, ?> req2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 2L, probe.ref());
160         final Request<?, ?> req3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref());
161         final Request<?, ?> req4 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 4L, probe.ref());
162         final Request<?, ?> req5 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 5L, probe.ref());
163         final Request<?, ?> req6 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 6L, probe.ref());
164         final Consumer<Response<?, ?>> callback = createConsumerMock();
165
166         // Fill the queue up to capacity + 1
167         queue.enqueueOrForward(new ConnectionEntry(req0, callback, 0), 0);
168         queue.enqueueOrForward(new ConnectionEntry(req1, callback, 0), 0);
169         queue.enqueueOrForward(new ConnectionEntry(req2, callback, 0), 0);
170         queue.enqueueOrForward(new ConnectionEntry(req3, callback, 0), 0);
171         assertEqualRequests(queue.getInflight(), req0, req1, req2);
172         assertEqualRequests(queue.getPending(), req3);
173
174         // Now complete req0, which should transmit req3
175         queue.complete(new FailureEnvelope(req0.toRequestFailure(mock(RequestException.class)), 0, 0, 0), 0);
176         assertEqualRequests(queue.getInflight(), req1, req2, req3);
177         assertEqualRequests(queue.getPending());
178
179         // Now complete req1, which should leave an empty slot
180         queue.complete(new FailureEnvelope(req1.toRequestFailure(mock(RequestException.class)), 0, 1, 0), 0);
181         assertEqualRequests(queue.getInflight(), req2, req3);
182         assertEqualRequests(queue.getPending());
183
184         // Enqueue req4, which should be immediately transmitted
185         queue.enqueueOrForward(new ConnectionEntry(req4, callback, 0), 0);
186         assertEqualRequests(queue.getInflight(), req2, req3, req4);
187         assertEqualRequests(queue.getPending());
188
189         // Enqueue req5, which should move to pending
190         queue.enqueueOrForward(new ConnectionEntry(req5, callback, 0), 0);
191         assertEqualRequests(queue.getInflight(), req2, req3, req4);
192         assertEqualRequests(queue.getPending(), req5);
193
194         // Remove req4, creating an inconsistency...
195         queue.getInflight().removeLast();
196         assertEqualRequests(queue.getInflight(), req2, req3);
197         assertEqualRequests(queue.getPending(), req5);
198
199         // ... and enqueue req6, which should cause req5 to be transmitted
200         queue.enqueueOrForward(new ConnectionEntry(req6, callback, 0), 0);
201         assertEqualRequests(queue.getInflight(), req2, req3, req5);
202         assertEqualRequests(queue.getPending(), req6);
203     }
204
205     private static void assertEqualRequests(final Collection<? extends ConnectionEntry> queue,
206             final Request<?, ?>... requests) {
207         final List<Request<?, ?>> queued = ImmutableList.copyOf(Collections2.transform(queue,
208             ConnectionEntry::getRequest));
209         assertEquals(Arrays.asList(requests), queued);
210     }
211 }