2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import static org.hamcrest.CoreMatchers.everyItem;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertThat;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.mock;
16 import static org.mockito.Mockito.verify;
17 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
19 import com.google.common.base.Ticker;
20 import com.google.common.collect.Collections2;
21 import com.google.common.collect.ImmutableList;
22 import com.google.common.collect.Iterables;
23 import com.google.common.testing.FakeTicker;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.List;
27 import java.util.Optional;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.Consumer;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.access.ABIVersion;
32 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
33 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
34 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
35 import org.opendaylight.controller.cluster.access.concepts.Request;
36 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
37 import org.opendaylight.controller.cluster.access.concepts.RequestException;
38 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
39 import org.opendaylight.controller.cluster.access.concepts.Response;
40 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
41 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
43 public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<TransmitQueue.Transmitting> {
45 private BackendInfo backendInfo;
48 protected int getMaxInFlightMessages() {
49 return backendInfo.getMaxMessages();
53 protected TransmitQueue.Transmitting createQueue() {
54 backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3);
55 return new TransmitQueue.Transmitting(0, backendInfo);
59 public void testComplete() throws Exception {
60 final long sequence1 = 0L;
61 final long sequence2 = 1L;
62 final Request<?, ?> request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref());
63 final TransactionIdentifier transactionIdentifier2 = new TransactionIdentifier(HISTORY, 1L);
64 final Request<?, ?> request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
65 final Consumer<Response<?, ?>> callback1 = createConsumerMock();
66 final Consumer<Response<?, ?>> callback2 = createConsumerMock();
67 final long now1 = Ticker.systemTicker().read();
68 final long now2 = Ticker.systemTicker().read();
70 queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1);
71 queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2);
72 final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1);
73 final RequestSuccess<?, ?> success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2);
74 //complete entries in different order
75 final Optional<TransmittedConnectionEntry> completed2 =
76 queue.complete(new SuccessEnvelope(success2, 0L, sequence2, 1L), now2);
77 final Optional<TransmittedConnectionEntry> completed1 =
78 queue.complete(new SuccessEnvelope(success1, 0L, sequence1, 1L), now1);
80 final TransmittedConnectionEntry transmittedEntry1 = completed1.orElseThrow(AssertionError::new);
81 assertEquals(transmittedEntry1.getRequest(), request1);
82 assertEquals(transmittedEntry1.getTxSequence(), sequence1);
83 assertEquals(transmittedEntry1.getCallback(), callback1);
85 final TransmittedConnectionEntry transmittedEntry2 = completed2.orElseThrow(AssertionError::new);
86 assertEquals(transmittedEntry2.getRequest(), request2);
87 assertEquals(transmittedEntry2.getTxSequence(), sequence2);
88 assertEquals(transmittedEntry2.getCallback(), callback2);
92 public void testEnqueueCanTransmit() throws Exception {
93 final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
94 final Consumer<Response<?, ?>> callback = createConsumerMock();
95 final long now = Ticker.systemTicker().read();
96 queue.enqueue(new ConnectionEntry(request, callback, now), now);
97 final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
98 assertEquals(request, requestEnvelope.getMessage());
102 public void testEnqueueBackendFull() throws Exception {
103 final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
104 final Consumer<Response<?, ?>> callback = createConsumerMock();
105 final long now = Ticker.systemTicker().read();
106 final int sentMessages = getMaxInFlightMessages() + 1;
107 for (int i = 0; i < sentMessages; i++) {
108 queue.enqueue(new ConnectionEntry(request, callback, now), now);
110 for (int i = 0; i < getMaxInFlightMessages(); i++) {
111 probe.expectMsgClass(RequestEnvelope.class);
114 final Iterable<ConnectionEntry> entries = queue.asIterable();
115 assertEquals(sentMessages, Iterables.size(entries));
116 assertThat(entries, everyItem(entryWithRequest(request)));
121 public void testCanTransmitCount() throws Exception {
122 assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0);
123 assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0);
128 public void testTransmit() throws Exception {
129 final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
130 final Consumer<Response<?, ?>> callback = createConsumerMock();
131 final long now = Ticker.systemTicker().read();
132 final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
133 queue.transmit(entry, now);
134 final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
135 assertEquals(request, requestEnvelope.getMessage());
139 public void testSetForwarder() throws Exception {
140 final FakeTicker ticker = new FakeTicker();
141 ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS);
142 final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
143 final Consumer<Response<?, ?>> callback = createConsumerMock();
144 final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
145 final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
146 queue.setForwarder(forwarder);
147 final long secondEnqueueNow = ticker.read();
148 queue.enqueue(entry, secondEnqueueNow);
149 verify(forwarder).forwardEntry(entry, secondEnqueueNow);
153 public void testCompleteOrdering() {
154 final Request<?, ?> req0 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
155 final Request<?, ?> req1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
156 final Request<?, ?> req2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 2L, probe.ref());
157 final Request<?, ?> req3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref());
158 final Request<?, ?> req4 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 4L, probe.ref());
159 final Request<?, ?> req5 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 5L, probe.ref());
160 final Request<?, ?> req6 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 6L, probe.ref());
161 final Consumer<Response<?, ?>> callback = createConsumerMock();
163 // Fill the queue up to capacity + 1
164 queue.enqueue(new ConnectionEntry(req0, callback, 0), 0);
165 queue.enqueue(new ConnectionEntry(req1, callback, 0), 0);
166 queue.enqueue(new ConnectionEntry(req2, callback, 0), 0);
167 queue.enqueue(new ConnectionEntry(req3, callback, 0), 0);
168 assertEqualRequests(queue.getInflight(), req0, req1, req2);
169 assertEqualRequests(queue.getPending(), req3);
171 // Now complete req0, which should transmit req3
172 queue.complete(new FailureEnvelope(req0.toRequestFailure(mock(RequestException.class)), 0, 0, 0), 0);
173 assertEqualRequests(queue.getInflight(), req1, req2, req3);
174 assertEqualRequests(queue.getPending());
176 // Now complete req1, which should leave an empty slot
177 queue.complete(new FailureEnvelope(req1.toRequestFailure(mock(RequestException.class)), 0, 1, 0), 0);
178 assertEqualRequests(queue.getInflight(), req2, req3);
179 assertEqualRequests(queue.getPending());
181 // Enqueue req4, which should be immediately transmitted
182 queue.enqueue(new ConnectionEntry(req4, callback, 0), 0);
183 assertEqualRequests(queue.getInflight(), req2, req3, req4);
184 assertEqualRequests(queue.getPending());
186 // Enqueue req5, which should move to pending
187 queue.enqueue(new ConnectionEntry(req5, callback, 0), 0);
188 assertEqualRequests(queue.getInflight(), req2, req3, req4);
189 assertEqualRequests(queue.getPending(), req5);
191 // Remove req4, creating an inconsistency...
192 queue.getInflight().removeLast();
193 assertEqualRequests(queue.getInflight(), req2, req3);
194 assertEqualRequests(queue.getPending(), req5);
196 // ... and enqueue req6, which should cause req5 to be transmitted
197 queue.enqueue(new ConnectionEntry(req6, callback, 0), 0);
198 assertEqualRequests(queue.getInflight(), req2, req3, req5);
199 assertEqualRequests(queue.getPending(), req6);
202 private static void assertEqualRequests(final Collection<? extends ConnectionEntry> queue,
203 final Request<?, ?>... requests) {
204 final List<Request<?, ?>> queued = ImmutableList.copyOf(Collections2.transform(queue,
205 ConnectionEntry::getRequest));
206 assertEquals(Arrays.asList(requests), queued);