From 90c86de54d7bf10bfb9ffb0a8ad6a818aeecc895 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 20 Jul 2017 17:51:52 +0200 Subject: [PATCH] BUG-8619: do not touch forward path during purge enqueue In case of a purge request, the request is sent from the head of a connection chain (i.e. the original connection which created the transaction) and propagated via forwarders. This path needs to make sure it does not go via throttling, as it is an internal detail. Separate the transmit paths a bit more, so that TransmitQueue can push messages to forwarders' replay path. Change-Id: I5e146b8d11e8654b4beae3959207efb9c2f18315 Signed-off-by: Robert Varga (cherry picked from commit b83c7f5e5cdaee5f250988182dccb749ac7432c2) --- .../client/AbstractClientConnection.java | 40 ++++++++++++++----- .../cluster/access/client/TransmitQueue.java | 25 +++++++++--- .../client/AbstractTransmitQueueTest.java | 12 +++--- .../client/TransmittingTransmitQueueTest.java | 24 +++++------ 4 files changed, 66 insertions(+), 35 deletions(-) diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index 1fa67632ea..3cc8d7073a 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -167,24 +167,42 @@ public abstract class AbstractClientConnection { enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), currentTime()); } - public final long enqueueEntry(final ConnectionEntry entry, final long now) { + private long enqueueOrForward(final ConnectionEntry entry, final long now) { lock.lock(); try { - final RequestException maybePoison = poisoned; - if (maybePoison != null) { - throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison); - } + commonEnqueue(entry, now); + return queue.enqueueOrForward(entry, now); + } finally { + lock.unlock(); + } + } - if (queue.isEmpty()) { - // The queue is becoming non-empty, schedule a timer. - scheduleTimer(entry.getEnqueuedTicks() + context.config().getRequestTimeout() - now); - } - return queue.enqueue(entry, now); + /** + * Enqueue an entry, possibly also transmitting it. + */ + public final void enqueueEntry(final ConnectionEntry entry, final long now) { + lock.lock(); + try { + commonEnqueue(entry, now); + queue.enqueueOrReplay(entry, now); } finally { lock.unlock(); } } + @GuardedBy("lock") + private void commonEnqueue(final ConnectionEntry entry, final long now) { + final RequestException maybePoison = poisoned; + if (maybePoison != null) { + throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison); + } + + if (queue.isEmpty()) { + // The queue is becoming non-empty, schedule a timer. + scheduleTimer(entry.getEnqueuedTicks() + context.config().getRequestTimeout() - now); + } + } + // To be called from ClientActorBehavior on ConnectedClientConnection after entries are replayed. final void cancelDebt() { queue.cancelDebt(currentTime()); @@ -227,7 +245,7 @@ public abstract class AbstractClientConnection { RequestException runtimeRequestException); final void sendEntry(final ConnectionEntry entry, final long now) { - long delay = enqueueEntry(entry, now); + long delay = enqueueOrForward(entry, now); try { if (delay >= DEBUG_DELAY_NANOS) { if (delay > MAX_DELAY_NANOS) { diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index 24264eeedf..705bb0d9cd 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -212,18 +212,31 @@ abstract class TransmitQueue { inflight.addLast(transmit(entry, now)); } - /** - * Enqueue an entry, possibly also transmitting it. - * - * @return Delay to be forced on the calling thread, in nanoseconds. - */ - final long enqueue(final ConnectionEntry entry, final long now) { + final long enqueueOrForward(final ConnectionEntry entry, final long now) { if (successor != null) { // This call will pay the enqueuing price, hence the caller does not have to successor.forwardEntry(entry, now); return 0; } + return enqueue(entry, now); + } + + final void enqueueOrReplay(final ConnectionEntry entry, final long now) { + if (successor != null) { + successor.replayEntry(entry, now); + } else { + enqueue(entry, now); + } + } + + /** + * Enqueue an entry, possibly also transmitting it. + * + * @return Delay to be forced on the calling thread, in nanoseconds. + */ + private long enqueue(final ConnectionEntry entry, final long now) { + // XXX: we should place a guard against incorrect entry sequences: // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java index b5f1bdac7e..3800da865a 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractTransmitQueueTest.java @@ -79,7 +79,7 @@ public abstract class AbstractTransmitQueueTest { final long now = Ticker.systemTicker().read(); final int sentMessages = getMaxInFlightMessages() + 1; for (int i = 0; i < sentMessages; i++) { - queue.enqueue(new ConnectionEntry(request, callback, now), now); + queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now); } final Collection entries = queue.drain(); Assert.assertEquals(sentMessages, entries.size()); @@ -100,7 +100,7 @@ public abstract class AbstractTransmitQueueTest { final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = Ticker.systemTicker().read(); - queue.enqueue(new ConnectionEntry(request, callback, now), now); + queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now); //different transaction id final TransactionIdentifier anotherTxId = new TransactionIdentifier(HISTORY, 1L); final RequestSuccess success1 = new TransactionPurgeResponse(anotherTxId, requestSequence); @@ -136,7 +136,7 @@ public abstract class AbstractTransmitQueueTest { final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = Ticker.systemTicker().read(); - queue.enqueue(new ConnectionEntry(request, callback, now), now); + queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now); Assert.assertFalse(queue.isEmpty()); } @@ -148,8 +148,8 @@ public abstract class AbstractTransmitQueueTest { final long now = Ticker.systemTicker().read(); final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now); final ConnectionEntry entry2 = new ConnectionEntry(request2, callback, now); - queue.enqueue(entry1, now); - queue.enqueue(entry2, now); + queue.enqueueOrForward(entry1, now); + queue.enqueueOrForward(entry2, now); Assert.assertEquals(entry1.getRequest(), queue.peek().getRequest()); } @@ -158,7 +158,7 @@ public abstract class AbstractTransmitQueueTest { final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = Ticker.systemTicker().read(); - queue.enqueue(new ConnectionEntry(request, callback, now), now); + queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now); queue.poison(new RuntimeRequestException("fail", new RuntimeException("fail"))); verify(callback).accept(any(TransactionFailure.class)); Assert.assertTrue(queue.isEmpty()); diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java index 11fc421903..f7ea931365 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java @@ -70,8 +70,8 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1); final RequestSuccess success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2); //complete entries in different order @@ -96,7 +96,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = now(); - queue.enqueue(new ConnectionEntry(request, callback, now), now); + queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now); final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); assertEquals(request, requestEnvelope.getMessage()); } @@ -108,7 +108,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest> callback = createConsumerMock(); // Fill the queue up to capacity + 1 - queue.enqueue(new ConnectionEntry(req0, callback, 0), 0); - queue.enqueue(new ConnectionEntry(req1, callback, 0), 0); - queue.enqueue(new ConnectionEntry(req2, callback, 0), 0); - queue.enqueue(new ConnectionEntry(req3, callback, 0), 0); + queue.enqueueOrForward(new ConnectionEntry(req0, callback, 0), 0); + queue.enqueueOrForward(new ConnectionEntry(req1, callback, 0), 0); + queue.enqueueOrForward(new ConnectionEntry(req2, callback, 0), 0); + queue.enqueueOrForward(new ConnectionEntry(req3, callback, 0), 0); assertEqualRequests(queue.getInflight(), req0, req1, req2); assertEqualRequests(queue.getPending(), req3); @@ -182,12 +182,12 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest