BUG-8402: fix transmit accounting
[controller.git] / opendaylight / md-sal / cds-access-client / src / test / java / org / opendaylight / controller / cluster / access / client / TransmittingTransmitQueueTest.java
index 4859591384bfd915e91dc9719e2cc0c8e447fb62..358cc9d408e9cbe7dc47d37e0c06244342f5f8d8 100644 (file)
@@ -8,6 +8,10 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import static org.hamcrest.CoreMatchers.everyItem;
 package org.opendaylight.controller.cluster.access.client;
 
 import static org.hamcrest.CoreMatchers.everyItem;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
@@ -15,18 +19,24 @@ import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
 
 import com.google.common.base.Ticker;
 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
 
 import com.google.common.base.Ticker;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.testing.FakeTicker;
 import com.google.common.collect.Iterables;
 import com.google.common.testing.FakeTicker;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
+import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
@@ -51,9 +61,9 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
     public void testComplete() throws Exception {
         final long sequence1 = 0L;
         final long sequence2 = 1L;
     public void testComplete() throws Exception {
         final long sequence1 = 0L;
         final long sequence2 = 1L;
-        final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref());
+        final Request<?, ?> request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref());
         final TransactionIdentifier transactionIdentifier2 = new TransactionIdentifier(HISTORY, 1L);
         final TransactionIdentifier transactionIdentifier2 = new TransactionIdentifier(HISTORY, 1L);
-        final Request request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
+        final Request<?, ?> request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
         final Consumer<Response<?, ?>> callback1 = createConsumerMock();
         final Consumer<Response<?, ?>> callback2 = createConsumerMock();
         final long now1 = Ticker.systemTicker().read();
         final Consumer<Response<?, ?>> callback1 = createConsumerMock();
         final Consumer<Response<?, ?>> callback2 = createConsumerMock();
         final long now1 = Ticker.systemTicker().read();
@@ -70,29 +80,29 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
                 queue.complete(new SuccessEnvelope(success1, 0L, sequence1, 1L), now1);
         //check first entry
         final TransmittedConnectionEntry transmittedEntry1 = completed1.orElseThrow(AssertionError::new);
                 queue.complete(new SuccessEnvelope(success1, 0L, sequence1, 1L), now1);
         //check first entry
         final TransmittedConnectionEntry transmittedEntry1 = completed1.orElseThrow(AssertionError::new);
-        Assert.assertEquals(transmittedEntry1.getRequest(), request1);
-        Assert.assertEquals(transmittedEntry1.getTxSequence(), sequence1);
-        Assert.assertEquals(transmittedEntry1.getCallback(), callback1);
+        assertEquals(transmittedEntry1.getRequest(), request1);
+        assertEquals(transmittedEntry1.getTxSequence(), sequence1);
+        assertEquals(transmittedEntry1.getCallback(), callback1);
         //check second entry
         final TransmittedConnectionEntry transmittedEntry2 = completed2.orElseThrow(AssertionError::new);
         //check second entry
         final TransmittedConnectionEntry transmittedEntry2 = completed2.orElseThrow(AssertionError::new);
-        Assert.assertEquals(transmittedEntry2.getRequest(), request2);
-        Assert.assertEquals(transmittedEntry2.getTxSequence(), sequence2);
-        Assert.assertEquals(transmittedEntry2.getCallback(), callback2);
+        assertEquals(transmittedEntry2.getRequest(), request2);
+        assertEquals(transmittedEntry2.getTxSequence(), sequence2);
+        assertEquals(transmittedEntry2.getCallback(), callback2);
     }
 
     @Test
     public void testEnqueueCanTransmit() throws Exception {
     }
 
     @Test
     public void testEnqueueCanTransmit() throws Exception {
-        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         queue.enqueue(new ConnectionEntry(request, callback, now), now);
         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         queue.enqueue(new ConnectionEntry(request, callback, now), now);
         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
-        Assert.assertEquals(request, requestEnvelope.getMessage());
+        assertEquals(request, requestEnvelope.getMessage());
     }
 
     @Test
     public void testEnqueueBackendFull() throws Exception {
     }
 
     @Test
     public void testEnqueueBackendFull() throws Exception {
-        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         final int sentMessages = getMaxInFlightMessages() + 1;
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         final int sentMessages = getMaxInFlightMessages() + 1;
@@ -104,34 +114,34 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         }
         probe.expectNoMsg();
         final Iterable<ConnectionEntry> entries = queue.asIterable();
         }
         probe.expectNoMsg();
         final Iterable<ConnectionEntry> entries = queue.asIterable();
-        Assert.assertEquals(sentMessages, Iterables.size(entries));
-        Assert.assertThat(entries, everyItem(entryWithRequest(request)));
+        assertEquals(sentMessages, Iterables.size(entries));
+        assertThat(entries, everyItem(entryWithRequest(request)));
     }
 
     @Test
     @Override
     public void testCanTransmitCount() throws Exception {
     }
 
     @Test
     @Override
     public void testCanTransmitCount() throws Exception {
-        Assert.assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0);
-        Assert.assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0);
+        assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0);
+        assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0);
     }
 
     @Test
     @Override
     public void testTransmit() throws Exception {
     }
 
     @Test
     @Override
     public void testTransmit() throws Exception {
-        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
         queue.transmit(entry, now);
         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
         queue.transmit(entry, now);
         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
-        Assert.assertEquals(request, requestEnvelope.getMessage());
+        assertEquals(request, requestEnvelope.getMessage());
     }
 
     @Test
     public void testSetForwarder() throws Exception {
         final FakeTicker ticker = new FakeTicker();
         ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS);
     }
 
     @Test
     public void testSetForwarder() throws Exception {
         final FakeTicker ticker = new FakeTicker();
         ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS);
-        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
         queue.enqueue(entry, ticker.read());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
         queue.enqueue(entry, ticker.read());
@@ -144,4 +154,60 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         verify(forwarder).forwardEntry(entry, secondEnqueueNow);
     }
 
         verify(forwarder).forwardEntry(entry, secondEnqueueNow);
     }
 
-}
\ No newline at end of file
+    @Test
+    public void testCompleteOrdering() {
+        final Request<?, ?> req0 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> req1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+        final Request<?, ?> req2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 2L, probe.ref());
+        final Request<?, ?> req3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref());
+        final Request<?, ?> req4 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 4L, probe.ref());
+        final Request<?, ?> req5 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 5L, probe.ref());
+        final Request<?, ?> req6 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 6L, probe.ref());
+        final Consumer<Response<?, ?>> callback = createConsumerMock();
+
+        // Fill the queue up to capacity + 1
+        queue.enqueue(new ConnectionEntry(req0, callback, 0), 0);
+        queue.enqueue(new ConnectionEntry(req1, callback, 0), 0);
+        queue.enqueue(new ConnectionEntry(req2, callback, 0), 0);
+        queue.enqueue(new ConnectionEntry(req3, callback, 0), 0);
+        assertEqualRequests(queue.getInflight(), req0, req1, req2);
+        assertEqualRequests(queue.getPending(), req3);
+
+        // Now complete req0, which should transmit req3
+        queue.complete(new FailureEnvelope(req0.toRequestFailure(mock(RequestException.class)), 0, 0, 0), 0);
+        assertEqualRequests(queue.getInflight(), req1, req2, req3);
+        assertEqualRequests(queue.getPending());
+
+        // Now complete req1, which should leave an empty slot
+        queue.complete(new FailureEnvelope(req1.toRequestFailure(mock(RequestException.class)), 0, 1, 0), 0);
+        assertEqualRequests(queue.getInflight(), req2, req3);
+        assertEqualRequests(queue.getPending());
+
+        // Enqueue req4, which should be immediately transmitted
+        queue.enqueue(new ConnectionEntry(req4, callback, 0), 0);
+        assertEqualRequests(queue.getInflight(), req2, req3, req4);
+        assertEqualRequests(queue.getPending());
+
+        // Enqueue req5, which should move to pending
+        queue.enqueue(new ConnectionEntry(req5, callback, 0), 0);
+        assertEqualRequests(queue.getInflight(), req2, req3, req4);
+        assertEqualRequests(queue.getPending(), req5);
+
+        // Remove req4, creating an inconsistency...
+        queue.getInflight().removeLast();
+        assertEqualRequests(queue.getInflight(), req2, req3);
+        assertEqualRequests(queue.getPending(), req5);
+
+        // ... and enqueue req6, which should cause req5 to be transmitted
+        queue.enqueue(new ConnectionEntry(req6, callback, 0), 0);
+        assertEqualRequests(queue.getInflight(), req2, req3, req5);
+        assertEqualRequests(queue.getPending(), req6);
+    }
+
+    private static void assertEqualRequests(final Collection<? extends ConnectionEntry> queue,
+            final Request<?, ?>... requests) {
+        final List<Request<?, ?>> queued = ImmutableList.copyOf(Collections2.transform(queue,
+            ConnectionEntry::getRequest));
+        assertEquals(Arrays.asList(requests), queued);
+    }
+}