BUG-8619: do not touch forward path during purge enqueue
[controller.git] / opendaylight / md-sal / cds-access-client / src / test / java / org / opendaylight / controller / cluster / access / client / TransmittingTransmitQueueTest.java
index 358cc9d408e9cbe7dc47d37e0c06244342f5f8d8..f7ea931365e819ab0529f6ada509be54c428ee32 100644 (file)
@@ -12,8 +12,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
@@ -21,7 +19,6 @@ import static org.opendaylight.controller.cluster.access.client.ConnectionEntryM
 import com.google.common.base.Ticker;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import com.google.common.testing.FakeTicker;
 import java.util.Arrays;
 import java.util.Collection;
@@ -46,6 +43,10 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
 
     private BackendInfo backendInfo;
 
+    private static long now() {
+        return Ticker.systemTicker().read();
+    }
+
     @Override
     protected int getMaxInFlightMessages() {
         return backendInfo.getMaxMessages();
@@ -54,7 +55,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
     @Override
     protected TransmitQueue.Transmitting createQueue() {
         backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3);
-        return new TransmitQueue.Transmitting(0, backendInfo);
+        return new TransmitQueue.Transmitting(new TransmitQueue.Halted(0), 0, backendInfo, now());
     }
 
     @Test
@@ -66,11 +67,11 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final Request<?, ?> request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
         final Consumer<Response<?, ?>> callback1 = createConsumerMock();
         final Consumer<Response<?, ?>> callback2 = createConsumerMock();
-        final long now1 = Ticker.systemTicker().read();
-        final long now2 = Ticker.systemTicker().read();
+        final long now1 = now();
+        final long now2 = now();
         //enqueue 2 entries
-        queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1);
-        queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2);
+        queue.enqueueOrForward(new ConnectionEntry(request1, callback1, now1), now1);
+        queue.enqueueOrForward(new ConnectionEntry(request2, callback2, now2), now2);
         final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1);
         final RequestSuccess<?, ?> success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2);
         //complete entries in different order
@@ -94,8 +95,8 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
     public void testEnqueueCanTransmit() throws Exception {
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
-        final long now = Ticker.systemTicker().read();
-        queue.enqueue(new ConnectionEntry(request, callback, now), now);
+        final long now = now();
+        queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
         assertEquals(request, requestEnvelope.getMessage());
     }
@@ -104,17 +105,17 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
     public void testEnqueueBackendFull() throws Exception {
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
-        final long now = Ticker.systemTicker().read();
+        final long now = now();
         final int sentMessages = getMaxInFlightMessages() + 1;
         for (int i = 0; i < sentMessages; i++) {
-            queue.enqueue(new ConnectionEntry(request, callback, now), now);
+            queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         }
         for (int i = 0; i < getMaxInFlightMessages(); i++) {
             probe.expectMsgClass(RequestEnvelope.class);
         }
         probe.expectNoMsg();
-        final Iterable<ConnectionEntry> entries = queue.asIterable();
-        assertEquals(sentMessages, Iterables.size(entries));
+        final Collection<ConnectionEntry> entries = queue.drain();
+        assertEquals(sentMessages, entries.size());
         assertThat(entries, everyItem(entryWithRequest(request)));
     }
 
@@ -130,7 +131,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
     public void testTransmit() throws Exception {
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
-        final long now = Ticker.systemTicker().read();
+        final long now = now();
         final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
         queue.transmit(entry, now);
         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
@@ -144,13 +145,10 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
-        queue.enqueue(entry, ticker.read());
         final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
-        final long setForwarderNow = ticker.read();
-        queue.setForwarder(forwarder, setForwarderNow);
-        verify(forwarder).forwardEntry(isA(TransmittedConnectionEntry.class), eq(setForwarderNow));
+        queue.setForwarder(forwarder, ticker.read());
         final long secondEnqueueNow = ticker.read();
-        queue.enqueue(entry, secondEnqueueNow);
+        queue.enqueueOrForward(entry, secondEnqueueNow);
         verify(forwarder).forwardEntry(entry, secondEnqueueNow);
     }
 
@@ -166,10 +164,10 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final Consumer<Response<?, ?>> callback = createConsumerMock();
 
         // Fill the queue up to capacity + 1
-        queue.enqueue(new ConnectionEntry(req0, callback, 0), 0);
-        queue.enqueue(new ConnectionEntry(req1, callback, 0), 0);
-        queue.enqueue(new ConnectionEntry(req2, callback, 0), 0);
-        queue.enqueue(new ConnectionEntry(req3, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req0, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req1, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req2, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req3, callback, 0), 0);
         assertEqualRequests(queue.getInflight(), req0, req1, req2);
         assertEqualRequests(queue.getPending(), req3);
 
@@ -184,12 +182,12 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         assertEqualRequests(queue.getPending());
 
         // Enqueue req4, which should be immediately transmitted
-        queue.enqueue(new ConnectionEntry(req4, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req4, callback, 0), 0);
         assertEqualRequests(queue.getInflight(), req2, req3, req4);
         assertEqualRequests(queue.getPending());
 
         // Enqueue req5, which should move to pending
-        queue.enqueue(new ConnectionEntry(req5, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req5, callback, 0), 0);
         assertEqualRequests(queue.getInflight(), req2, req3, req4);
         assertEqualRequests(queue.getPending(), req5);
 
@@ -199,7 +197,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         assertEqualRequests(queue.getPending(), req5);
 
         // ... and enqueue req6, which should cause req5 to be transmitted
-        queue.enqueue(new ConnectionEntry(req6, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req6, callback, 0), 0);
         assertEqualRequests(queue.getInflight(), req2, req3, req5);
         assertEqualRequests(queue.getPending(), req6);
     }