BUG-8494: fix throttling during reconnect 14/58214/2
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 29 May 2017 21:58:07 +0000 (23:58 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 4 Jun 2017 15:32:26 +0000 (17:32 +0200)
ReconnectForwarder is called from differing code-paths: the one is
during replay when we are dealing with late requests (those which have
been waiting while we replaying), the other is subsequent user requests.

The first one should not be waiting on the queue, as the requests have
already entered it, hence have payed the cost of entry. The latter needs
to pay for entering the queue, as otherwise we do not exert backpressure.

This patch differentiates the two code paths, so they behave as they
should. Also add more debug information in timer paths.

Change-Id: I609be2332b13868ef1b9511399e2827d7f3d5b7d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 851fb56fba015c9fee3f0f9235c5c631a492ce59)

opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java

index 4425c4336263399ff6818f477c8e0916f0fb5e5b..98442256c6ecbcc9f63f54586d05611a80797d36 100644 (file)
@@ -137,23 +137,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      */
     public final void sendRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
         final long now = currentTime();
-        long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now);
-        try {
-            if (delay >= DEBUG_DELAY_NANOS) {
-                if (delay > MAX_DELAY_NANOS) {
-                    LOG.info("Capping {} throttle delay from {} to {} seconds", this,
-                        TimeUnit.NANOSECONDS.toSeconds(delay), MAX_DELAY_SECONDS);
-                    delay = MAX_DELAY_NANOS;
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Sleeping for {}ms", TimeUnit.NANOSECONDS.toMillis(delay));
-                }
-            }
-            TimeUnit.NANOSECONDS.sleep(delay);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            LOG.debug("Interrupted after sleeping {}ns", e, currentTime() - now);
-        }
+        sendEntry(new ConnectionEntry(request, callback, now), now);
     }
 
     /**
@@ -173,6 +157,24 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), currentTime());
     }
 
+    public final long enqueueEntry(final ConnectionEntry entry, final long now) {
+        lock.lock();
+        try {
+            final RequestException maybePoison = poisoned;
+            if (maybePoison != null) {
+                throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+            }
+
+            if (queue.isEmpty()) {
+                // The queue is becoming non-empty, schedule a timer.
+                scheduleTimer(entry.getEnqueuedTicks() + REQUEST_TIMEOUT_NANOS - now);
+            }
+            return queue.enqueue(entry, now);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     public abstract Optional<T> getBackendInfo();
 
     final Collection<ConnectionEntry> startReplay() {
@@ -209,21 +211,24 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current,
             RequestException runtimeRequestException);
 
-    final long enqueueEntry(final ConnectionEntry entry, final long now) {
-        lock.lock();
+    final void sendEntry(final ConnectionEntry entry, final long now) {
+        long delay = enqueueEntry(entry, now);
         try {
-            final RequestException maybePoison = poisoned;
-            if (maybePoison != null) {
-                throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
-            }
-
-            if (queue.isEmpty()) {
-                // The queue is becoming non-empty, schedule a timer.
-                scheduleTimer(entry.getEnqueuedTicks() + REQUEST_TIMEOUT_NANOS - now);
+            if (delay >= DEBUG_DELAY_NANOS) {
+                if (delay > MAX_DELAY_NANOS) {
+                    LOG.info("Capping {} throttle delay from {} to {} seconds", this,
+                        TimeUnit.NANOSECONDS.toSeconds(delay), MAX_DELAY_SECONDS);
+                    delay = MAX_DELAY_NANOS;
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("{}: Sleeping for {}ms on connection {}", context.persistenceId(),
+                        TimeUnit.NANOSECONDS.toMillis(delay), this);
+                }
             }
-            return queue.enqueue(entry, now);
-        } finally {
-            lock.unlock();
+            TimeUnit.NANOSECONDS.sleep(delay);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.debug("Interrupted after sleeping {}ns", e, currentTime() - now);
         }
     }
 
@@ -244,11 +249,11 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     @GuardedBy("lock")
     private void scheduleTimer(final long delay) {
         if (haveTimer) {
-            LOG.debug("{}: timer already scheduled", context.persistenceId());
+            LOG.debug("{}: timer already scheduled on {}", context.persistenceId(), this);
             return;
         }
         if (queue.hasSuccessor()) {
-            LOG.debug("{}: connection has successor, not scheduling timer", context.persistenceId());
+            LOG.debug("{}: connection {} has a successor, not scheduling timer", context.persistenceId(), this);
             return;
         }
 
@@ -257,7 +262,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         final long normalized =  delay <= 0 ? 0 : Math.min(delay, BACKEND_ALIVE_TIMEOUT_NANOS);
 
         final FiniteDuration dur = FiniteDuration.fromNanos(normalized);
-        LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), dur);
+        LOG.debug("{}: connection {} scheduling timeout in {}", context.persistenceId(), this, dur);
         context.executeInActor(this::runTimer, dur);
         haveTimer = true;
     }
@@ -277,6 +282,9 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         try {
             haveTimer = false;
             final long now = currentTime();
+
+            LOG.debug("{}: running timer on {}", context.persistenceId(), this);
+
             // The following line is only reliable when queue is not forwarding, but such state should not last long.
             // FIXME: BUG-8422: this may not be accurate w.r.t. replayed entries
             final long ticksSinceProgress = queue.ticksStalling(now);
@@ -295,6 +303,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             delay = lockedCheckTimeout(now);
             if (delay == null) {
                 // We have timed out. There is no point in scheduling a timer
+                LOG.debug("{}: connection {} timed out", context.persistenceId(), this);
                 return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out",
                     new TimeoutException()));
             }
@@ -302,6 +311,8 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             if (delay.isPresent()) {
                 // If there is new delay, schedule a timer
                 scheduleTimer(delay.get());
+            } else {
+                LOG.debug("{}: not scheduling timeout on {}", context.persistenceId(), this);
             }
         } finally {
             lock.unlock();
@@ -335,13 +346,14 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     @GuardedBy("lock")
     private Optional<Long> lockedCheckTimeout(final long now) {
         if (queue.isEmpty()) {
+            LOG.debug("{}: connection {} is empty", context.persistenceId(), this);
             return Optional.empty();
         }
 
         final long backendSilentTicks = backendSilentTicks(now);
         if (backendSilentTicks >= BACKEND_ALIVE_TIMEOUT_NANOS) {
-            LOG.debug("Connection {} has not seen activity from backend for {} nanoseconds, timing out", this,
-                backendSilentTicks);
+            LOG.debug("{}: Connection {} has not seen activity from backend for {} nanoseconds, timing out",
+                context.persistenceId(), this, backendSilentTicks);
             return null;
         }
 
@@ -354,7 +366,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
             tasksTimedOut++;
             queue.remove(now);
-            LOG.debug("Connection {} timed out entryt {}", this, head);
+            LOG.debug("{}: Connection {} timed out entry {}", context.persistenceId(), this, head);
             head.complete(head.getRequest().toRequestFailure(
                 new RequestTimeoutException("Timed out after " + beenOpen + "ns")));
         }
index 25e5d6edfebc8acabb10347480a5c6f01ff19a5e..58c9e7549e493c5587cdaceb299de5b4b603d565 100644 (file)
@@ -8,9 +8,6 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import com.google.common.base.Preconditions;
-import java.util.function.Consumer;
-import org.opendaylight.controller.cluster.access.concepts.Request;
-import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,12 +26,18 @@ public abstract class ReconnectForwarder {
         this.successor = Preconditions.checkNotNull(successor);
     }
 
-    protected final void sendToSuccessor(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
-        successor.sendRequest(request, callback);
+    protected final void sendToSuccessor(final ConnectionEntry entry) {
+        successor.sendRequest(entry.getRequest(), entry.getCallback());
+    }
+
+    protected final void replayToSuccessor(final ConnectionEntry entry) {
+        successor.enqueueRequest(entry.getRequest(), entry.getCallback(), entry.getEnqueuedTicks());
     }
 
     protected abstract void forwardEntry(ConnectionEntry entry, long now);
 
+    protected abstract void replayEntry(ConnectionEntry entry, long now);
+
     final AbstractReceivingClientConnection<?> successor() {
         return successor;
     }
index 2def9a10152663b50c4e829436900b89955f42d9..90ec49e5d0b64f9a674a9bfc99c6ac42add21cc3 100644 (file)
@@ -15,7 +15,12 @@ final class SimpleReconnectForwarder extends ReconnectForwarder {
 
     @Override
     protected void forwardEntry(final ConnectionEntry entry, final long now) {
-        // We are ignoring requested delay, as we have already paid the admission delay
+        successor().sendEntry(entry, now);
+    }
+
+    @Override
+    protected void replayEntry(final ConnectionEntry entry, final long now) {
+        // We are executing in the context of the client thread, do not block
         successor().enqueueEntry(entry, now);
     }
 }
index b7543410cd1a63128ac2da7ffde001cc9b3d778f..178a46cb0a271e80a7186f83a25f9242d363a133 100644 (file)
@@ -188,6 +188,7 @@ abstract class TransmitQueue {
      */
     final long enqueue(final ConnectionEntry entry, final long now) {
         if (successor != null) {
+            // This call will pay the enqueuing price, hence the caller does not have to
             successor.forwardEntry(entry, now);
             return 0;
         }
@@ -257,14 +258,14 @@ abstract class TransmitQueue {
         int count = 0;
         ConnectionEntry entry = inflight.poll();
         while (entry != null) {
-            successor.forwardEntry(entry, now);
+            successor.replayEntry(entry, now);
             entry = inflight.poll();
             count++;
         }
 
         entry = pending.poll();
         while (entry != null) {
-            successor.forwardEntry(entry, now);
+            successor.replayEntry(entry, now);
             entry = pending.poll();
             count++;
         }
index cb568647afe0b21bb5527cd1c8e6e1413c350be2..1eff70f6d9299ab9f782bbe1e8ed54dbca259f4d 100644 (file)
@@ -645,11 +645,11 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             for (Object obj : successfulRequests) {
                 if (obj instanceof TransactionRequest) {
                     LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
-                    successor.replayRequest((TransactionRequest<?>) obj, resp -> { }, now);
+                    successor.doReplayRequest((TransactionRequest<?>) obj, resp -> { }, now);
                 } else {
                     Verify.verify(obj instanceof IncrementSequence);
                     final IncrementSequence increment = (IncrementSequence) obj;
-                    successor.replayRequest(new IncrementTransactionSequenceRequest(getIdentifier(),
+                    successor.doReplayRequest(new IncrementTransactionSequenceRequest(getIdentifier(),
                         increment.getSequence(), localActor(), isSnapshotOnly(), increment.getDelta()), resp -> { },
                         now);
                     LOG.debug("Incrementing sequence {} to successor {}", obj, successor);
@@ -668,7 +668,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             if (getIdentifier().equals(req.getTarget())) {
                 Verify.verify(req instanceof TransactionRequest, "Unhandled request %s", req);
                 LOG.debug("Replaying queued request {} to successor {}", req, successor);
-                successor.replayRequest((TransactionRequest<?>) req, e.getCallback(), e.getEnqueuedTicks());
+                successor.doReplayRequest((TransactionRequest<?>) req, e.getCallback(), e.getEnqueuedTicks());
                 it.remove();
             }
         }
@@ -696,7 +696,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * @param callback Callback to be invoked once the request completes
      * @param enqueuedTicks ticker-based time stamp when the request was enqueued
      */
-    private void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+    private void doReplayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
             final long enqueuedTicks) {
         if (request instanceof AbstractLocalTransactionRequest) {
             handleReplayedLocalRequest((AbstractLocalTransactionRequest<?>) request, callback, enqueuedTicks);
@@ -736,6 +736,11 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         }
     }
 
+    final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
+        getSuccessorState().getSuccessor().doReplayRequest(request, callback, enqueuedTicks);
+    }
+
     abstract boolean isSnapshotOnly();
 
     abstract void doDelete(YangInstanceIdentifier path);
index a518c551694cfe0f1a1d3827b5a8555d81d5bad3..26e346e77cb4e0a0a19b6c66b5f5903d141a4803 100644 (file)
@@ -57,6 +57,23 @@ final class BouncingReconnectForwarder extends ReconnectForwarder {
 
     @Override
     protected void forwardEntry(final ConnectionEntry entry, final long now) {
+        try {
+            findCohort(entry).forwardEntry(entry, this::sendToSuccessor);
+        } catch (RequestException e) {
+            entry.complete(entry.getRequest().toRequestFailure(e));
+        }
+    }
+
+    @Override
+    protected void replayEntry(final ConnectionEntry entry, final long now) {
+        try {
+            findCohort(entry).replayEntry(entry, this::replayToSuccessor);
+        } catch (RequestException e) {
+            entry.complete(entry.getRequest().toRequestFailure(e));
+        }
+    }
+
+    private ProxyReconnectCohort findCohort(final ConnectionEntry entry) throws CohortNotFoundException {
         final Request<? , ?> request = entry.getRequest();
 
         final LocalHistoryIdentifier historyId;
@@ -68,16 +85,12 @@ final class BouncingReconnectForwarder extends ReconnectForwarder {
             throw new IllegalArgumentException("Unhandled request " + request);
         }
 
-        try {
-            final ProxyReconnectCohort cohort = cohorts.get(historyId);
-            if (cohort == null) {
-                LOG.warn("Cohort for request {} not found, aborting it", request);
-                throw new CohortNotFoundException(historyId);
-            }
-
-            cohort.forwardRequest(request, entry.getCallback(), this::sendToSuccessor);
-        } catch (RequestException e) {
-            entry.complete(request.toRequestFailure(e));
+        final ProxyReconnectCohort cohort = cohorts.get(historyId);
+        if (cohort == null) {
+            LOG.warn("Cohort for request {} not found, aborting it", request);
+            throw new CohortNotFoundException(historyId);
         }
+
+        return cohort;
     }
-}
\ No newline at end of file
+}
index e75a2df4c0d6084ba6629ff2926357a7b6a8a0f1..e26e00fa13a98c42d4236cce29050a55f1f020a9 100644 (file)
@@ -19,7 +19,6 @@ import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
@@ -272,22 +271,34 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
 
         @Override
-        void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
-                final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
-            // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
-            //        period required to get into the queue.
+        void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
+                throws RequestException {
+            final Request<?, ?> request = entry.getRequest();
             if (request instanceof TransactionRequest) {
-                forwardTransactionRequest((TransactionRequest<?>) request, callback);
+                lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
+                    entry.getEnqueuedTicks());
             } else if (request instanceof LocalHistoryRequest) {
-                forwardTo.accept(request, callback);
+                replayTo.accept(entry);
             } else {
                 throw new IllegalArgumentException("Unhandled request " + request);
             }
         }
 
-        private void forwardTransactionRequest(final TransactionRequest<?> request,
-                final Consumer<Response<?, ?>> callback) throws RequestException {
+        @Override
+        void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
+                throws RequestException {
+            final Request<?, ?> request = entry.getRequest();
+            if (request instanceof TransactionRequest) {
+                lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
+            } else if (request instanceof LocalHistoryRequest) {
+                forwardTo.accept(entry);
+            } else {
+                throw new IllegalArgumentException("Unhandled request " + request);
+            }
+        }
 
+        private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
+                throws RequestReplayException {
             final AbstractProxyTransaction proxy;
             lock.lock();
             try {
@@ -295,11 +306,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             } finally {
                 lock.unlock();
             }
-            if (proxy == null) {
-                throw new RequestReplayException("Failed to find proxy for %s", request);
+            if (proxy != null) {
+                return proxy;
             }
 
-            proxy.forwardRequest(request, callback);
+            throw new RequestReplayException("Failed to find proxy for %s", request);
         }
     }
 
index 14bf0645f77c0a5300a3440d0f79dea55e438da5..11e612c69d9cecfed8284de5b96bcd63899a583e 100644 (file)
@@ -8,13 +8,10 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import java.util.Collection;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
-import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.yangtools.concepts.Identifiable;
 
 abstract class ProxyReconnectCohort implements Identifiable<LocalHistoryIdentifier> {
@@ -23,6 +20,7 @@ abstract class ProxyReconnectCohort implements Identifiable<LocalHistoryIdentifi
 
     abstract ProxyHistory finishReconnect();
 
-    abstract void forwardRequest(Request<?, ?> request, Consumer<Response<?, ?>> callback,
-            BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException;
+    abstract void replayEntry(ConnectionEntry entry, Consumer<ConnectionEntry> replayTo) throws RequestException;
+
+    abstract void forwardEntry(ConnectionEntry entry, Consumer<ConnectionEntry> forwardTo) throws RequestException;
 }