BUG-8619: do not touch forward path during purge enqueue 00/60700/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 20 Jul 2017 15:51:52 +0000 (17:51 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 24 Jul 2017 20:45:31 +0000 (22:45 +0200)
In case of a purge request, the request is sent from the head
of a connection chain (i.e. the original connection which created
the transaction) and propagated via forwarders. This path needs
to make sure it does not go via throttling, as it is an internal
detail.

Separate the transmit paths a bit more, so that TransmitQueue
can push messages to forwarders' replay path.

Change-Id: I5e146b8d11e8654b4beae3959207efb9c2f18315
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit b83c7f5e5cdaee5f250988182dccb749ac7432c2)

opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java

index 1fa6763..3cc8d70 100644 (file)
@@ -167,24 +167,42 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), currentTime());
     }
 
-    public final long enqueueEntry(final ConnectionEntry entry, final long now) {
+    private long enqueueOrForward(final ConnectionEntry entry, final long now) {
         lock.lock();
         try {
-            final RequestException maybePoison = poisoned;
-            if (maybePoison != null) {
-                throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
-            }
+            commonEnqueue(entry, now);
+            return queue.enqueueOrForward(entry, now);
+        } finally {
+            lock.unlock();
+        }
+    }
 
-            if (queue.isEmpty()) {
-                // The queue is becoming non-empty, schedule a timer.
-                scheduleTimer(entry.getEnqueuedTicks() + context.config().getRequestTimeout() - now);
-            }
-            return queue.enqueue(entry, now);
+    /**
+     * Enqueue an entry, possibly also transmitting it.
+     */
+    public final void enqueueEntry(final ConnectionEntry entry, final long now) {
+        lock.lock();
+        try {
+            commonEnqueue(entry, now);
+            queue.enqueueOrReplay(entry, now);
         } finally {
             lock.unlock();
         }
     }
 
+    @GuardedBy("lock")
+    private void commonEnqueue(final ConnectionEntry entry, final long now) {
+        final RequestException maybePoison = poisoned;
+        if (maybePoison != null) {
+            throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+        }
+
+        if (queue.isEmpty()) {
+            // The queue is becoming non-empty, schedule a timer.
+            scheduleTimer(entry.getEnqueuedTicks() + context.config().getRequestTimeout() - now);
+        }
+    }
+
     // To be called from ClientActorBehavior on ConnectedClientConnection after entries are replayed.
     final void cancelDebt() {
         queue.cancelDebt(currentTime());
@@ -227,7 +245,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             RequestException runtimeRequestException);
 
     final void sendEntry(final ConnectionEntry entry, final long now) {
-        long delay = enqueueEntry(entry, now);
+        long delay = enqueueOrForward(entry, now);
         try {
             if (delay >= DEBUG_DELAY_NANOS) {
                 if (delay > MAX_DELAY_NANOS) {
index 24264ee..705bb0d 100644 (file)
@@ -212,18 +212,31 @@ abstract class TransmitQueue {
         inflight.addLast(transmit(entry, now));
     }
 
-    /**
-     * Enqueue an entry, possibly also transmitting it.
-     *
-     * @return Delay to be forced on the calling thread, in nanoseconds.
-     */
-    final long enqueue(final ConnectionEntry entry, final long now) {
+    final long enqueueOrForward(final ConnectionEntry entry, final long now) {
         if (successor != null) {
             // This call will pay the enqueuing price, hence the caller does not have to
             successor.forwardEntry(entry, now);
             return 0;
         }
 
+        return enqueue(entry, now);
+    }
+
+    final void enqueueOrReplay(final ConnectionEntry entry, final long now) {
+        if (successor != null) {
+            successor.replayEntry(entry, now);
+        } else {
+            enqueue(entry, now);
+        }
+    }
+
+    /**
+     * Enqueue an entry, possibly also transmitting it.
+     *
+     * @return Delay to be forced on the calling thread, in nanoseconds.
+     */
+    private long enqueue(final ConnectionEntry entry, final long now) {
+
         // XXX: we should place a guard against incorrect entry sequences:
         // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
 
index b5f1bda..3800da8 100644 (file)
@@ -79,7 +79,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final long now = Ticker.systemTicker().read();
         final int sentMessages = getMaxInFlightMessages() + 1;
         for (int i = 0; i < sentMessages; i++) {
-            queue.enqueue(new ConnectionEntry(request, callback, now), now);
+            queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         }
         final Collection<ConnectionEntry> entries = queue.drain();
         Assert.assertEquals(sentMessages, entries.size());
@@ -100,7 +100,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
-        queue.enqueue(new ConnectionEntry(request, callback, now), now);
+        queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         //different transaction id
         final TransactionIdentifier anotherTxId = new TransactionIdentifier(HISTORY, 1L);
         final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(anotherTxId, requestSequence);
@@ -136,7 +136,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
-        queue.enqueue(new ConnectionEntry(request, callback, now), now);
+        queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         Assert.assertFalse(queue.isEmpty());
     }
 
@@ -148,8 +148,8 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final long now = Ticker.systemTicker().read();
         final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now);
         final ConnectionEntry entry2 = new ConnectionEntry(request2, callback, now);
-        queue.enqueue(entry1, now);
-        queue.enqueue(entry2, now);
+        queue.enqueueOrForward(entry1, now);
+        queue.enqueueOrForward(entry2, now);
         Assert.assertEquals(entry1.getRequest(), queue.peek().getRequest());
     }
 
@@ -158,7 +158,7 @@ public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
-        queue.enqueue(new ConnectionEntry(request, callback, now), now);
+        queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         queue.poison(new RuntimeRequestException("fail", new RuntimeException("fail")));
         verify(callback).accept(any(TransactionFailure.class));
         Assert.assertTrue(queue.isEmpty());
index 11fc421..f7ea931 100644 (file)
@@ -70,8 +70,8 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final long now1 = now();
         final long now2 = now();
         //enqueue 2 entries
-        queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1);
-        queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2);
+        queue.enqueueOrForward(new ConnectionEntry(request1, callback1, now1), now1);
+        queue.enqueueOrForward(new ConnectionEntry(request2, callback2, now2), now2);
         final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1);
         final RequestSuccess<?, ?> success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2);
         //complete entries in different order
@@ -96,7 +96,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = now();
-        queue.enqueue(new ConnectionEntry(request, callback, now), now);
+        queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
         assertEquals(request, requestEnvelope.getMessage());
     }
@@ -108,7 +108,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final long now = now();
         final int sentMessages = getMaxInFlightMessages() + 1;
         for (int i = 0; i < sentMessages; i++) {
-            queue.enqueue(new ConnectionEntry(request, callback, now), now);
+            queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
         }
         for (int i = 0; i < getMaxInFlightMessages(); i++) {
             probe.expectMsgClass(RequestEnvelope.class);
@@ -148,7 +148,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
         queue.setForwarder(forwarder, ticker.read());
         final long secondEnqueueNow = ticker.read();
-        queue.enqueue(entry, secondEnqueueNow);
+        queue.enqueueOrForward(entry, secondEnqueueNow);
         verify(forwarder).forwardEntry(entry, secondEnqueueNow);
     }
 
@@ -164,10 +164,10 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final Consumer<Response<?, ?>> callback = createConsumerMock();
 
         // Fill the queue up to capacity + 1
-        queue.enqueue(new ConnectionEntry(req0, callback, 0), 0);
-        queue.enqueue(new ConnectionEntry(req1, callback, 0), 0);
-        queue.enqueue(new ConnectionEntry(req2, callback, 0), 0);
-        queue.enqueue(new ConnectionEntry(req3, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req0, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req1, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req2, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req3, callback, 0), 0);
         assertEqualRequests(queue.getInflight(), req0, req1, req2);
         assertEqualRequests(queue.getPending(), req3);
 
@@ -182,12 +182,12 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         assertEqualRequests(queue.getPending());
 
         // Enqueue req4, which should be immediately transmitted
-        queue.enqueue(new ConnectionEntry(req4, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req4, callback, 0), 0);
         assertEqualRequests(queue.getInflight(), req2, req3, req4);
         assertEqualRequests(queue.getPending());
 
         // Enqueue req5, which should move to pending
-        queue.enqueue(new ConnectionEntry(req5, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req5, callback, 0), 0);
         assertEqualRequests(queue.getInflight(), req2, req3, req4);
         assertEqualRequests(queue.getPending(), req5);
 
@@ -197,7 +197,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         assertEqualRequests(queue.getPending(), req5);
 
         // ... and enqueue req6, which should cause req5 to be transmitted
-        queue.enqueue(new ConnectionEntry(req6, callback, 0), 0);
+        queue.enqueueOrForward(new ConnectionEntry(req6, callback, 0), 0);
         assertEqualRequests(queue.getInflight(), req2, req3, req5);
         assertEqualRequests(queue.getPending(), req6);
     }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.