BUG-8619: do not touch forward path during purge enqueue 00/60700/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 20 Jul 2017 15:51:52 +0000 (17:51 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 24 Jul 2017 20:45:31 +0000 (22:45 +0200)
In case of a purge request, the request is sent from the head
of a connection chain (i.e. the original connection which created
the transaction) and propagated via forwarders. This path needs
to make sure it does not go via throttling, as it is an internal
detail.

Separate the transmit paths a bit more, so that TransmitQueue
can push messages to forwarders' replay path.

Change-Id: I5e146b8d11e8654b4beae3959207efb9c2f18315
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit b83c7f5e5cdaee5f250988182dccb749ac7432c2)

opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java

index 1fa67632eab593e400511433ad2ef900798df726..3cc8d7073ae3467d04fed6468f1af943b7c15b0f 100644 (file)
@@ -167,24 +167,42 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), currentTime());
     }
 
-    public final long enqueueEntry(final ConnectionEntry entry, final long now) {
+    private long enqueueOrForward(final ConnectionEntry entry, final long now) {
         lock.lock();
         try {
-            final RequestException maybePoison = poisoned;
-            if (maybePoison != null) {
-                throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
-            }
+            commonEnqueue(entry, now);
+            return queue.enqueueOrForward(entry, now);
+        } finally {
+            lock.unlock();
+        }
+    }
 
-            if (queue.isEmpty()) {
-                // The queue is becoming non-empty, schedule a timer.
-                scheduleTimer(entry.getEnqueuedTicks() + context.config().getRequestTimeout() - now);
-            }
-            return queue.enqueue(entry, now);
+    /**
+     * Enqueue an entry, possibly also transmitting it.
+     */
+    public final void enqueueEntry(final ConnectionEntry entry, final long now) {
+        lock.lock();
+        try {
+            commonEnqueue(entry, now);
+            queue.enqueueOrReplay(entry, now);
         } finally {
             lock.unlock();
         }
     }
 
+    @GuardedBy("lock")
+    private void commonEnqueue(final ConnectionEntry entry, final long now) {
+        final RequestException maybePoison = poisoned;
+        if (maybePoison != null) {
+            throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+        }
+
+        if (queue.isEmpty()) {
+            // The queue is becoming non-empty, schedule a timer.
+            scheduleTimer(entry.getEnqueuedTicks() + context.config().getRequestTimeout() - now);
+        }
+    }
+
     // To be called from ClientActorBehavior on ConnectedClientConnection after entries are replayed.
     final void cancelDebt() {
         queue.cancelDebt(currentTime());
@@ -227,7 +245,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             RequestException runtimeRequestException);
 
     final void sendEntry(final ConnectionEntry entry, final long now) {
-        long delay = enqueueEntry(entry, now);
+        long delay = enqueueOrForward(entry, now);
         try {
             if (delay >= DEBUG_DELAY_NANOS) {
                 if (delay > MAX_DELAY_NANOS) {
index 24264eeedff77bca598e58d4d3f4f4a6ac996c34..705bb0d9cd6e7a32da5f45884bc3d07b6f2592ed 100644 (file)
@@ -212,18 +212,31 @@ abstract class TransmitQueue {
         inflight.addLast(transmit(entry, now));
     }
 
-    /**
-     * Enqueue an entry, possibly also transmitting it.
-     *
-     * @return Delay to be forced on the calling thread, in nanoseconds.
-     */
-    final long enqueue(final ConnectionEntry entry, final long now) {
+    final long enqueueOrForward(final ConnectionEntry entry, final long now) {
         if (successor != null) {
             // This call will pay the enqueuing price, hence the caller does not have to
             successor.forwardEntry(entry, now);
             return 0;
         }
 
+        return enqueue(entry, now);
+    }
+
+    final void enqueueOrReplay(final ConnectionEntry entry, final long now) {
+        if (successor != null) {
+            successor.replayEntry(entry, now);
+        } else {
+            enqueue(entry, now);
+        }
+    }
+
+    /**
+     * Enqueue an entry, possibly also transmitting it.
+     *
+     * @return Delay to be forced on the calling thread, in nanoseconds.
+     */
+    private long enqueue(final ConnectionEntry entry, final long now) {
+
         // XXX: we should place a guard against incorrect entry sequences:
         // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
 
index b5f1bdac7e6a01f9d0d8eeb18d53609998b1b958..3800da865aabef86be1710c7fdf24f1b694a8766 100644 (file)
@@ -79,7 +79,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final long now = Ticker.systemTicker().read();
         final int sentMessages = getMaxInFlightMessages() + 1;
         for (int i = 0; i < sentMessages; i++) {
-            queue.enqueue(new ConnectionEntry(request, callback, now), now);
+            queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         }
         final Collection<ConnectionEntry> entries = queue.drain();
         Assert.assertEquals(sentMessages, entries.size());
@@ -100,7 +100,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
-        queue.enqueue(new ConnectionEntry(request, callback, now), now);
+        queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         //different transaction id
         final TransactionIdentifier anotherTxId = new TransactionIdentifier(HISTORY, 1L);
         final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(anotherTxId, requestSequence);
@@ -136,7 +136,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
-        queue.enqueue(new ConnectionEntry(request, callback, now), now);
+        queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         Assert.assertFalse(queue.isEmpty());
     }
 
@@ -148,8 +148,8 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final long now = Ticker.systemTicker().read();
         final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now);
         final ConnectionEntry entry2 = new ConnectionEntry(request2, callback, now);
-        queue.enqueue(entry1, now);
-        queue.enqueue(entry2, now);
+        queue.enqueueOrForward(entry1, now);
+        queue.enqueueOrForward(entry2, now);
         Assert.assertEquals(entry1.getRequest(), queue.peek().getRequest());
     }
 
@@ -158,7 +158,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
-        queue.enqueue(new ConnectionEntry(request, callback, now), now);
+        queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         queue.poison(new RuntimeRequestException("fail", new RuntimeException("fail")));
         verify(callback).accept(any(TransactionFailure.class));
         Assert.assertTrue(queue.isEmpty());
index 11fc421903b77c522ed3c0d0dffd5e9e6cff3ca0..f7ea931365e819ab0529f6ada509be54c428ee32 100644 (file)
@@ -70,8 +70,8 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final long now1 = now();
         final long now2 = now();
         //enqueue 2 entries
-        queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1);
-        queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2);
+        queue.enqueueOrForward(new ConnectionEntry(request1, callback1, now1), now1);
+        queue.enqueueOrForward(new ConnectionEntry(request2, callback2, now2), now2);
         final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1);
         final RequestSuccess<?, ?> success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2);
         //complete entries in different order
@@ -96,7 +96,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = now();
-        queue.enqueue(new ConnectionEntry(request, callback, now), now);
+        queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
         assertEquals(request, requestEnvelope.getMessage());
     }
@@ -108,7 +108,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final long now = now();
         final int sentMessages = getMaxInFlightMessages() + 1;
         for (int i = 0; i < sentMessages; i++) {
-            queue.enqueue(new ConnectionEntry(request, callback, now), now);
+            queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         }
         for (int i = 0; i < getMaxInFlightMessages(); i++) {
             probe.expectMsgClass(RequestEnvelope.class);
@@ -148,7 +148,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
         queue.setForwarder(forwarder, ticker.read());
         final long secondEnqueueNow = ticker.read();
-        queue.enqueue(entry, secondEnqueueNow);
+        queue.enqueueOrForward(entry, secondEnqueueNow);
         verify(forwarder).forwardEntry(entry, secondEnqueueNow);
     }
 
@@ -164,10 +164,10 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final Consumer<Response<?, ?>> callback = createConsumerMock();
 
         // Fill the queue up to capacity + 1
-        queue.enqueue(new ConnectionEntry(req0, callback, 0), 0);
-        queue.enqueue(new ConnectionEntry(req1, callback, 0), 0);
-        queue.enqueue(new ConnectionEntry(req2, callback, 0), 0);
-        queue.enqueue(new ConnectionEntry(req3, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req0, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req1, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req2, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req3, callback, 0), 0);
         assertEqualRequests(queue.getInflight(), req0, req1, req2);
         assertEqualRequests(queue.getPending(), req3);
 
@@ -182,12 +182,12 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         assertEqualRequests(queue.getPending());
 
         // Enqueue req4, which should be immediately transmitted
-        queue.enqueue(new ConnectionEntry(req4, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req4, callback, 0), 0);
         assertEqualRequests(queue.getInflight(), req2, req3, req4);
         assertEqualRequests(queue.getPending());
 
         // Enqueue req5, which should move to pending
-        queue.enqueue(new ConnectionEntry(req5, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req5, callback, 0), 0);
         assertEqualRequests(queue.getInflight(), req2, req3, req4);
         assertEqualRequests(queue.getPending(), req5);
 
@@ -197,7 +197,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         assertEqualRequests(queue.getPending(), req5);
 
         // ... and enqueue req6, which should cause req5 to be transmitted
-        queue.enqueue(new ConnectionEntry(req6, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req6, callback, 0), 0);
         assertEqualRequests(queue.getInflight(), req2, req3, req5);
         assertEqualRequests(queue.getPending(), req6);
     }