358cc9d408e9cbe7dc47d37e0c06244342f5f8d8
[controller.git] / opendaylight / md-sal / cds-access-client / src / test / java / org / opendaylight / controller / cluster / access / client / TransmittingTransmitQueueTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.access.client;
9
10 import static org.hamcrest.CoreMatchers.everyItem;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertThat;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Matchers.eq;
16 import static org.mockito.Matchers.isA;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.verify;
19 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
20
21 import com.google.common.base.Ticker;
22 import com.google.common.collect.Collections2;
23 import com.google.common.collect.ImmutableList;
24 import com.google.common.collect.Iterables;
25 import com.google.common.testing.FakeTicker;
26 import java.util.Arrays;
27 import java.util.Collection;
28 import java.util.List;
29 import java.util.Optional;
30 import java.util.concurrent.TimeUnit;
31 import java.util.function.Consumer;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.access.ABIVersion;
34 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
35 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
36 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
37 import org.opendaylight.controller.cluster.access.concepts.Request;
38 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
39 import org.opendaylight.controller.cluster.access.concepts.RequestException;
40 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
41 import org.opendaylight.controller.cluster.access.concepts.Response;
42 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
43 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
44
45 public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<TransmitQueue.Transmitting> {
46
47     private BackendInfo backendInfo;
48
49     @Override
50     protected int getMaxInFlightMessages() {
51         return backendInfo.getMaxMessages();
52     }
53
54     @Override
55     protected TransmitQueue.Transmitting createQueue() {
56         backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3);
57         return new TransmitQueue.Transmitting(0, backendInfo);
58     }
59
60     @Test
61     public void testComplete() throws Exception {
62         final long sequence1 = 0L;
63         final long sequence2 = 1L;
64         final Request<?, ?> request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref());
65         final TransactionIdentifier transactionIdentifier2 = new TransactionIdentifier(HISTORY, 1L);
66         final Request<?, ?> request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
67         final Consumer<Response<?, ?>> callback1 = createConsumerMock();
68         final Consumer<Response<?, ?>> callback2 = createConsumerMock();
69         final long now1 = Ticker.systemTicker().read();
70         final long now2 = Ticker.systemTicker().read();
71         //enqueue 2 entries
72         queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1);
73         queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2);
74         final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1);
75         final RequestSuccess<?, ?> success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2);
76         //complete entries in different order
77         final Optional<TransmittedConnectionEntry> completed2 =
78                 queue.complete(new SuccessEnvelope(success2, 0L, sequence2, 1L), now2);
79         final Optional<TransmittedConnectionEntry> completed1 =
80                 queue.complete(new SuccessEnvelope(success1, 0L, sequence1, 1L), now1);
81         //check first entry
82         final TransmittedConnectionEntry transmittedEntry1 = completed1.orElseThrow(AssertionError::new);
83         assertEquals(transmittedEntry1.getRequest(), request1);
84         assertEquals(transmittedEntry1.getTxSequence(), sequence1);
85         assertEquals(transmittedEntry1.getCallback(), callback1);
86         //check second entry
87         final TransmittedConnectionEntry transmittedEntry2 = completed2.orElseThrow(AssertionError::new);
88         assertEquals(transmittedEntry2.getRequest(), request2);
89         assertEquals(transmittedEntry2.getTxSequence(), sequence2);
90         assertEquals(transmittedEntry2.getCallback(), callback2);
91     }
92
93     @Test
94     public void testEnqueueCanTransmit() throws Exception {
95         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
96         final Consumer<Response<?, ?>> callback = createConsumerMock();
97         final long now = Ticker.systemTicker().read();
98         queue.enqueue(new ConnectionEntry(request, callback, now), now);
99         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
100         assertEquals(request, requestEnvelope.getMessage());
101     }
102
103     @Test
104     public void testEnqueueBackendFull() throws Exception {
105         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
106         final Consumer<Response<?, ?>> callback = createConsumerMock();
107         final long now = Ticker.systemTicker().read();
108         final int sentMessages = getMaxInFlightMessages() + 1;
109         for (int i = 0; i < sentMessages; i++) {
110             queue.enqueue(new ConnectionEntry(request, callback, now), now);
111         }
112         for (int i = 0; i < getMaxInFlightMessages(); i++) {
113             probe.expectMsgClass(RequestEnvelope.class);
114         }
115         probe.expectNoMsg();
116         final Iterable<ConnectionEntry> entries = queue.asIterable();
117         assertEquals(sentMessages, Iterables.size(entries));
118         assertThat(entries, everyItem(entryWithRequest(request)));
119     }
120
121     @Test
122     @Override
123     public void testCanTransmitCount() throws Exception {
124         assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0);
125         assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0);
126     }
127
128     @Test
129     @Override
130     public void testTransmit() throws Exception {
131         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
132         final Consumer<Response<?, ?>> callback = createConsumerMock();
133         final long now = Ticker.systemTicker().read();
134         final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
135         queue.transmit(entry, now);
136         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
137         assertEquals(request, requestEnvelope.getMessage());
138     }
139
140     @Test
141     public void testSetForwarder() throws Exception {
142         final FakeTicker ticker = new FakeTicker();
143         ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS);
144         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
145         final Consumer<Response<?, ?>> callback = createConsumerMock();
146         final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
147         queue.enqueue(entry, ticker.read());
148         final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
149         final long setForwarderNow = ticker.read();
150         queue.setForwarder(forwarder, setForwarderNow);
151         verify(forwarder).forwardEntry(isA(TransmittedConnectionEntry.class), eq(setForwarderNow));
152         final long secondEnqueueNow = ticker.read();
153         queue.enqueue(entry, secondEnqueueNow);
154         verify(forwarder).forwardEntry(entry, secondEnqueueNow);
155     }
156
157     @Test
158     public void testCompleteOrdering() {
159         final Request<?, ?> req0 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
160         final Request<?, ?> req1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
161         final Request<?, ?> req2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 2L, probe.ref());
162         final Request<?, ?> req3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref());
163         final Request<?, ?> req4 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 4L, probe.ref());
164         final Request<?, ?> req5 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 5L, probe.ref());
165         final Request<?, ?> req6 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 6L, probe.ref());
166         final Consumer<Response<?, ?>> callback = createConsumerMock();
167
168         // Fill the queue up to capacity + 1
169         queue.enqueue(new ConnectionEntry(req0, callback, 0), 0);
170         queue.enqueue(new ConnectionEntry(req1, callback, 0), 0);
171         queue.enqueue(new ConnectionEntry(req2, callback, 0), 0);
172         queue.enqueue(new ConnectionEntry(req3, callback, 0), 0);
173         assertEqualRequests(queue.getInflight(), req0, req1, req2);
174         assertEqualRequests(queue.getPending(), req3);
175
176         // Now complete req0, which should transmit req3
177         queue.complete(new FailureEnvelope(req0.toRequestFailure(mock(RequestException.class)), 0, 0, 0), 0);
178         assertEqualRequests(queue.getInflight(), req1, req2, req3);
179         assertEqualRequests(queue.getPending());
180
181         // Now complete req1, which should leave an empty slot
182         queue.complete(new FailureEnvelope(req1.toRequestFailure(mock(RequestException.class)), 0, 1, 0), 0);
183         assertEqualRequests(queue.getInflight(), req2, req3);
184         assertEqualRequests(queue.getPending());
185
186         // Enqueue req4, which should be immediately transmitted
187         queue.enqueue(new ConnectionEntry(req4, callback, 0), 0);
188         assertEqualRequests(queue.getInflight(), req2, req3, req4);
189         assertEqualRequests(queue.getPending());
190
191         // Enqueue req5, which should move to pending
192         queue.enqueue(new ConnectionEntry(req5, callback, 0), 0);
193         assertEqualRequests(queue.getInflight(), req2, req3, req4);
194         assertEqualRequests(queue.getPending(), req5);
195
196         // Remove req4, creating an inconsistency...
197         queue.getInflight().removeLast();
198         assertEqualRequests(queue.getInflight(), req2, req3);
199         assertEqualRequests(queue.getPending(), req5);
200
201         // ... and enqueue req6, which should cause req5 to be transmitted
202         queue.enqueue(new ConnectionEntry(req6, callback, 0), 0);
203         assertEqualRequests(queue.getInflight(), req2, req3, req5);
204         assertEqualRequests(queue.getPending(), req6);
205     }
206
207     private static void assertEqualRequests(final Collection<? extends ConnectionEntry> queue,
208             final Request<?, ?>... requests) {
209         final List<Request<?, ?>> queued = ImmutableList.copyOf(Collections2.transform(queue,
210             ConnectionEntry::getRequest));
211         assertEquals(Arrays.asList(requests), queued);
212     }
213 }