From c09801280b4c44f4ec26766e4d13b1a5d1f3ed59 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 29 May 2017 23:58:07 +0200 Subject: [PATCH] BUG-8494: fix throttling during reconnect ReconnectForwarder is called from differing code-paths: the one is during replay when we are dealing with late requests (those which have been waiting while we replaying), the other is subsequent user requests. The first one should not be waiting on the queue, as the requests have already entered it, hence have payed the cost of entry. The latter needs to pay for entering the queue, as otherwise we do not exert backpressure. This patch differentiates the two code paths, so they behave as they should. Also add more debug information in timer paths. Change-Id: I609be2332b13868ef1b9511399e2827d7f3d5b7d Signed-off-by: Robert Varga (cherry picked from commit 851fb56fba015c9fee3f0f9235c5c631a492ce59) --- .../client/AbstractClientConnection.java | 84 +++++++++++-------- .../access/client/ReconnectForwarder.java | 13 +-- .../client/SimpleReconnectForwarder.java | 7 +- .../cluster/access/client/TransmitQueue.java | 5 +- .../actors/dds/AbstractProxyTransaction.java | 13 ++- .../dds/BouncingReconnectForwarder.java | 35 +++++--- .../databroker/actors/dds/ProxyHistory.java | 35 +++++--- .../actors/dds/ProxyReconnectCohort.java | 8 +- 8 files changed, 124 insertions(+), 76 deletions(-) diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index 4425c43362..98442256c6 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -137,23 +137,7 @@ public abstract class AbstractClientConnection { */ public final void sendRequest(final Request request, final Consumer> callback) { final long now = currentTime(); - long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now); - try { - if (delay >= DEBUG_DELAY_NANOS) { - if (delay > MAX_DELAY_NANOS) { - LOG.info("Capping {} throttle delay from {} to {} seconds", this, - TimeUnit.NANOSECONDS.toSeconds(delay), MAX_DELAY_SECONDS); - delay = MAX_DELAY_NANOS; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Sleeping for {}ms", TimeUnit.NANOSECONDS.toMillis(delay)); - } - } - TimeUnit.NANOSECONDS.sleep(delay); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.debug("Interrupted after sleeping {}ns", e, currentTime() - now); - } + sendEntry(new ConnectionEntry(request, callback, now), now); } /** @@ -173,6 +157,24 @@ public abstract class AbstractClientConnection { enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), currentTime()); } + public final long enqueueEntry(final ConnectionEntry entry, final long now) { + lock.lock(); + try { + final RequestException maybePoison = poisoned; + if (maybePoison != null) { + throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison); + } + + if (queue.isEmpty()) { + // The queue is becoming non-empty, schedule a timer. + scheduleTimer(entry.getEnqueuedTicks() + REQUEST_TIMEOUT_NANOS - now); + } + return queue.enqueue(entry, now); + } finally { + lock.unlock(); + } + } + public abstract Optional getBackendInfo(); final Collection startReplay() { @@ -209,21 +211,24 @@ public abstract class AbstractClientConnection { abstract ClientActorBehavior lockedReconnect(ClientActorBehavior current, RequestException runtimeRequestException); - final long enqueueEntry(final ConnectionEntry entry, final long now) { - lock.lock(); + final void sendEntry(final ConnectionEntry entry, final long now) { + long delay = enqueueEntry(entry, now); try { - final RequestException maybePoison = poisoned; - if (maybePoison != null) { - throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison); - } - - if (queue.isEmpty()) { - // The queue is becoming non-empty, schedule a timer. - scheduleTimer(entry.getEnqueuedTicks() + REQUEST_TIMEOUT_NANOS - now); + if (delay >= DEBUG_DELAY_NANOS) { + if (delay > MAX_DELAY_NANOS) { + LOG.info("Capping {} throttle delay from {} to {} seconds", this, + TimeUnit.NANOSECONDS.toSeconds(delay), MAX_DELAY_SECONDS); + delay = MAX_DELAY_NANOS; + } + if (LOG.isDebugEnabled()) { + LOG.debug("{}: Sleeping for {}ms on connection {}", context.persistenceId(), + TimeUnit.NANOSECONDS.toMillis(delay), this); + } } - return queue.enqueue(entry, now); - } finally { - lock.unlock(); + TimeUnit.NANOSECONDS.sleep(delay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.debug("Interrupted after sleeping {}ns", e, currentTime() - now); } } @@ -244,11 +249,11 @@ public abstract class AbstractClientConnection { @GuardedBy("lock") private void scheduleTimer(final long delay) { if (haveTimer) { - LOG.debug("{}: timer already scheduled", context.persistenceId()); + LOG.debug("{}: timer already scheduled on {}", context.persistenceId(), this); return; } if (queue.hasSuccessor()) { - LOG.debug("{}: connection has successor, not scheduling timer", context.persistenceId()); + LOG.debug("{}: connection {} has a successor, not scheduling timer", context.persistenceId(), this); return; } @@ -257,7 +262,7 @@ public abstract class AbstractClientConnection { final long normalized = delay <= 0 ? 0 : Math.min(delay, BACKEND_ALIVE_TIMEOUT_NANOS); final FiniteDuration dur = FiniteDuration.fromNanos(normalized); - LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), dur); + LOG.debug("{}: connection {} scheduling timeout in {}", context.persistenceId(), this, dur); context.executeInActor(this::runTimer, dur); haveTimer = true; } @@ -277,6 +282,9 @@ public abstract class AbstractClientConnection { try { haveTimer = false; final long now = currentTime(); + + LOG.debug("{}: running timer on {}", context.persistenceId(), this); + // The following line is only reliable when queue is not forwarding, but such state should not last long. // FIXME: BUG-8422: this may not be accurate w.r.t. replayed entries final long ticksSinceProgress = queue.ticksStalling(now); @@ -295,6 +303,7 @@ public abstract class AbstractClientConnection { delay = lockedCheckTimeout(now); if (delay == null) { // We have timed out. There is no point in scheduling a timer + LOG.debug("{}: connection {} timed out", context.persistenceId(), this); return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out", new TimeoutException())); } @@ -302,6 +311,8 @@ public abstract class AbstractClientConnection { if (delay.isPresent()) { // If there is new delay, schedule a timer scheduleTimer(delay.get()); + } else { + LOG.debug("{}: not scheduling timeout on {}", context.persistenceId(), this); } } finally { lock.unlock(); @@ -335,13 +346,14 @@ public abstract class AbstractClientConnection { @GuardedBy("lock") private Optional lockedCheckTimeout(final long now) { if (queue.isEmpty()) { + LOG.debug("{}: connection {} is empty", context.persistenceId(), this); return Optional.empty(); } final long backendSilentTicks = backendSilentTicks(now); if (backendSilentTicks >= BACKEND_ALIVE_TIMEOUT_NANOS) { - LOG.debug("Connection {} has not seen activity from backend for {} nanoseconds, timing out", this, - backendSilentTicks); + LOG.debug("{}: Connection {} has not seen activity from backend for {} nanoseconds, timing out", + context.persistenceId(), this, backendSilentTicks); return null; } @@ -354,7 +366,7 @@ public abstract class AbstractClientConnection { tasksTimedOut++; queue.remove(now); - LOG.debug("Connection {} timed out entryt {}", this, head); + LOG.debug("{}: Connection {} timed out entry {}", context.persistenceId(), this, head); head.complete(head.getRequest().toRequestFailure( new RequestTimeoutException("Timed out after " + beenOpen + "ns"))); } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java index 25e5d6edfe..58c9e7549e 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java @@ -8,9 +8,6 @@ package org.opendaylight.controller.cluster.access.client; import com.google.common.base.Preconditions; -import java.util.function.Consumer; -import org.opendaylight.controller.cluster.access.concepts.Request; -import org.opendaylight.controller.cluster.access.concepts.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,12 +26,18 @@ public abstract class ReconnectForwarder { this.successor = Preconditions.checkNotNull(successor); } - protected final void sendToSuccessor(final Request request, final Consumer> callback) { - successor.sendRequest(request, callback); + protected final void sendToSuccessor(final ConnectionEntry entry) { + successor.sendRequest(entry.getRequest(), entry.getCallback()); + } + + protected final void replayToSuccessor(final ConnectionEntry entry) { + successor.enqueueRequest(entry.getRequest(), entry.getCallback(), entry.getEnqueuedTicks()); } protected abstract void forwardEntry(ConnectionEntry entry, long now); + protected abstract void replayEntry(ConnectionEntry entry, long now); + final AbstractReceivingClientConnection successor() { return successor; } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java index 2def9a1015..90ec49e5d0 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java @@ -15,7 +15,12 @@ final class SimpleReconnectForwarder extends ReconnectForwarder { @Override protected void forwardEntry(final ConnectionEntry entry, final long now) { - // We are ignoring requested delay, as we have already paid the admission delay + successor().sendEntry(entry, now); + } + + @Override + protected void replayEntry(final ConnectionEntry entry, final long now) { + // We are executing in the context of the client thread, do not block successor().enqueueEntry(entry, now); } } diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index b7543410cd..178a46cb0a 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -188,6 +188,7 @@ abstract class TransmitQueue { */ final long enqueue(final ConnectionEntry entry, final long now) { if (successor != null) { + // This call will pay the enqueuing price, hence the caller does not have to successor.forwardEntry(entry, now); return 0; } @@ -257,14 +258,14 @@ abstract class TransmitQueue { int count = 0; ConnectionEntry entry = inflight.poll(); while (entry != null) { - successor.forwardEntry(entry, now); + successor.replayEntry(entry, now); entry = inflight.poll(); count++; } entry = pending.poll(); while (entry != null) { - successor.forwardEntry(entry, now); + successor.replayEntry(entry, now); entry = pending.poll(); count++; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index cb568647af..1eff70f6d9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -645,11 +645,11 @@ abstract class AbstractProxyTransaction implements Identifiable) obj, resp -> { }, now); + successor.doReplayRequest((TransactionRequest) obj, resp -> { }, now); } else { Verify.verify(obj instanceof IncrementSequence); final IncrementSequence increment = (IncrementSequence) obj; - successor.replayRequest(new IncrementTransactionSequenceRequest(getIdentifier(), + successor.doReplayRequest(new IncrementTransactionSequenceRequest(getIdentifier(), increment.getSequence(), localActor(), isSnapshotOnly(), increment.getDelta()), resp -> { }, now); LOG.debug("Incrementing sequence {} to successor {}", obj, successor); @@ -668,7 +668,7 @@ abstract class AbstractProxyTransaction implements Identifiable) req, e.getCallback(), e.getEnqueuedTicks()); + successor.doReplayRequest((TransactionRequest) req, e.getCallback(), e.getEnqueuedTicks()); it.remove(); } } @@ -696,7 +696,7 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback, + private void doReplayRequest(final TransactionRequest request, final Consumer> callback, final long enqueuedTicks) { if (request instanceof AbstractLocalTransactionRequest) { handleReplayedLocalRequest((AbstractLocalTransactionRequest) request, callback, enqueuedTicks); @@ -736,6 +736,11 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> callback, + final long enqueuedTicks) { + getSuccessorState().getSuccessor().doReplayRequest(request, callback, enqueuedTicks); + } + abstract boolean isSnapshotOnly(); abstract void doDelete(YangInstanceIdentifier path); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java index a518c55169..26e346e77c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java @@ -57,6 +57,23 @@ final class BouncingReconnectForwarder extends ReconnectForwarder { @Override protected void forwardEntry(final ConnectionEntry entry, final long now) { + try { + findCohort(entry).forwardEntry(entry, this::sendToSuccessor); + } catch (RequestException e) { + entry.complete(entry.getRequest().toRequestFailure(e)); + } + } + + @Override + protected void replayEntry(final ConnectionEntry entry, final long now) { + try { + findCohort(entry).replayEntry(entry, this::replayToSuccessor); + } catch (RequestException e) { + entry.complete(entry.getRequest().toRequestFailure(e)); + } + } + + private ProxyReconnectCohort findCohort(final ConnectionEntry entry) throws CohortNotFoundException { final Request request = entry.getRequest(); final LocalHistoryIdentifier historyId; @@ -68,16 +85,12 @@ final class BouncingReconnectForwarder extends ReconnectForwarder { throw new IllegalArgumentException("Unhandled request " + request); } - try { - final ProxyReconnectCohort cohort = cohorts.get(historyId); - if (cohort == null) { - LOG.warn("Cohort for request {} not found, aborting it", request); - throw new CohortNotFoundException(historyId); - } - - cohort.forwardRequest(request, entry.getCallback(), this::sendToSuccessor); - } catch (RequestException e) { - entry.complete(request.toRequestFailure(e)); + final ProxyReconnectCohort cohort = cohorts.get(historyId); + if (cohort == null) { + LOG.warn("Cohort for request {} not found, aborting it", request); + throw new CohortNotFoundException(historyId); } + + return cohort; } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index e75a2df4c0..e26e00fa13 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -19,7 +19,6 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.BiConsumer; import java.util.function.Consumer; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; @@ -272,22 +271,34 @@ abstract class ProxyHistory implements Identifiable { } @Override - void forwardRequest(final Request request, final Consumer> callback, - final BiConsumer, Consumer>> forwardTo) throws RequestException { - // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the - // period required to get into the queue. + void replayEntry(final ConnectionEntry entry, final Consumer replayTo) + throws RequestException { + final Request request = entry.getRequest(); if (request instanceof TransactionRequest) { - forwardTransactionRequest((TransactionRequest) request, callback); + lookupProxy(request).replayRequest((TransactionRequest) request, entry.getCallback(), + entry.getEnqueuedTicks()); } else if (request instanceof LocalHistoryRequest) { - forwardTo.accept(request, callback); + replayTo.accept(entry); } else { throw new IllegalArgumentException("Unhandled request " + request); } } - private void forwardTransactionRequest(final TransactionRequest request, - final Consumer> callback) throws RequestException { + @Override + void forwardEntry(final ConnectionEntry entry, final Consumer forwardTo) + throws RequestException { + final Request request = entry.getRequest(); + if (request instanceof TransactionRequest) { + lookupProxy(request).forwardRequest((TransactionRequest) request, entry.getCallback()); + } else if (request instanceof LocalHistoryRequest) { + forwardTo.accept(entry); + } else { + throw new IllegalArgumentException("Unhandled request " + request); + } + } + private AbstractProxyTransaction lookupProxy(final Request request) + throws RequestReplayException { final AbstractProxyTransaction proxy; lock.lock(); try { @@ -295,11 +306,11 @@ abstract class ProxyHistory implements Identifiable { } finally { lock.unlock(); } - if (proxy == null) { - throw new RequestReplayException("Failed to find proxy for %s", request); + if (proxy != null) { + return proxy; } - proxy.forwardRequest(request, callback); + throw new RequestReplayException("Failed to find proxy for %s", request); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java index 14bf0645f7..11e612c69d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java @@ -8,13 +8,10 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import java.util.Collection; -import java.util.function.BiConsumer; import java.util.function.Consumer; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; -import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestException; -import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.yangtools.concepts.Identifiable; abstract class ProxyReconnectCohort implements Identifiable { @@ -23,6 +20,7 @@ abstract class ProxyReconnectCohort implements Identifiable request, Consumer> callback, - BiConsumer, Consumer>> replayTo) throws RequestException; + abstract void replayEntry(ConnectionEntry entry, Consumer replayTo) throws RequestException; + + abstract void forwardEntry(ConnectionEntry entry, Consumer forwardTo) throws RequestException; } -- 2.36.6