BUG-8494: fix throttling during reconnect 14/58214/2
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 29 May 2017 21:58:07 +0000 (23:58 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 4 Jun 2017 15:32:26 +0000 (17:32 +0200)
ReconnectForwarder is called from differing code-paths: the one is
during replay when we are dealing with late requests (those which have
been waiting while we replaying), the other is subsequent user requests.

The first one should not be waiting on the queue, as the requests have
already entered it, hence have payed the cost of entry. The latter needs
to pay for entering the queue, as otherwise we do not exert backpressure.

This patch differentiates the two code paths, so they behave as they
should. Also add more debug information in timer paths.

Change-Id: I609be2332b13868ef1b9511399e2827d7f3d5b7d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 851fb56fba015c9fee3f0f9235c5c631a492ce59)

opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ReconnectForwarder.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/SimpleReconnectForwarder.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/BouncingReconnectForwarder.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyReconnectCohort.java

index 4425c43..9844225 100644 (file)
@@ -137,23 +137,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      */
     public final void sendRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
         final long now = currentTime();
-        long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now);
-        try {
-            if (delay >= DEBUG_DELAY_NANOS) {
-                if (delay > MAX_DELAY_NANOS) {
-                    LOG.info("Capping {} throttle delay from {} to {} seconds", this,
-                        TimeUnit.NANOSECONDS.toSeconds(delay), MAX_DELAY_SECONDS);
-                    delay = MAX_DELAY_NANOS;
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Sleeping for {}ms", TimeUnit.NANOSECONDS.toMillis(delay));
-                }
-            }
-            TimeUnit.NANOSECONDS.sleep(delay);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            LOG.debug("Interrupted after sleeping {}ns", e, currentTime() - now);
-        }
+        sendEntry(new ConnectionEntry(request, callback, now), now);
     }
 
     /**
@@ -173,6 +157,24 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), currentTime());
     }
 
+    public final long enqueueEntry(final ConnectionEntry entry, final long now) {
+        lock.lock();
+        try {
+            final RequestException maybePoison = poisoned;
+            if (maybePoison != null) {
+                throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+            }
+
+            if (queue.isEmpty()) {
+                // The queue is becoming non-empty, schedule a timer.
+                scheduleTimer(entry.getEnqueuedTicks() + REQUEST_TIMEOUT_NANOS - now);
+            }
+            return queue.enqueue(entry, now);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     public abstract Optional<T> getBackendInfo();
 
     final Collection<ConnectionEntry> startReplay() {
@@ -209,21 +211,24 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current,
             RequestException runtimeRequestException);
 
-    final long enqueueEntry(final ConnectionEntry entry, final long now) {
-        lock.lock();
+    final void sendEntry(final ConnectionEntry entry, final long now) {
+        long delay = enqueueEntry(entry, now);
         try {
-            final RequestException maybePoison = poisoned;
-            if (maybePoison != null) {
-                throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
-            }
-
-            if (queue.isEmpty()) {
-                // The queue is becoming non-empty, schedule a timer.
-                scheduleTimer(entry.getEnqueuedTicks() + REQUEST_TIMEOUT_NANOS - now);
+            if (delay >= DEBUG_DELAY_NANOS) {
+                if (delay > MAX_DELAY_NANOS) {
+                    LOG.info("Capping {} throttle delay from {} to {} seconds", this,
+                        TimeUnit.NANOSECONDS.toSeconds(delay), MAX_DELAY_SECONDS);
+                    delay = MAX_DELAY_NANOS;
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("{}: Sleeping for {}ms on connection {}", context.persistenceId(),
+                        TimeUnit.NANOSECONDS.toMillis(delay), this);
+                }
             }
-            return queue.enqueue(entry, now);
-        } finally {
-            lock.unlock();
+            TimeUnit.NANOSECONDS.sleep(delay);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.debug("Interrupted after sleeping {}ns", e, currentTime() - now);
         }
     }
 
@@ -244,11 +249,11 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     @GuardedBy("lock")
     private void scheduleTimer(final long delay) {
         if (haveTimer) {
-            LOG.debug("{}: timer already scheduled", context.persistenceId());
+            LOG.debug("{}: timer already scheduled on {}", context.persistenceId(), this);
             return;
         }
         if (queue.hasSuccessor()) {
-            LOG.debug("{}: connection has successor, not scheduling timer", context.persistenceId());
+            LOG.debug("{}: connection {} has a successor, not scheduling timer", context.persistenceId(), this);
             return;
         }
 
@@ -257,7 +262,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         final long normalized =  delay <= 0 ? 0 : Math.min(delay, BACKEND_ALIVE_TIMEOUT_NANOS);
 
         final FiniteDuration dur = FiniteDuration.fromNanos(normalized);
-        LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), dur);
+        LOG.debug("{}: connection {} scheduling timeout in {}", context.persistenceId(), this, dur);
         context.executeInActor(this::runTimer, dur);
         haveTimer = true;
     }
@@ -277,6 +282,9 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         try {
             haveTimer = false;
             final long now = currentTime();
+
+            LOG.debug("{}: running timer on {}", context.persistenceId(), this);
+
             // The following line is only reliable when queue is not forwarding, but such state should not last long.
             // FIXME: BUG-8422: this may not be accurate w.r.t. replayed entries
             final long ticksSinceProgress = queue.ticksStalling(now);
@@ -295,6 +303,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             delay = lockedCheckTimeout(now);
             if (delay == null) {
                 // We have timed out. There is no point in scheduling a timer
+                LOG.debug("{}: connection {} timed out", context.persistenceId(), this);
                 return lockedReconnect(current, new RuntimeRequestException("Backend connection timed out",
                     new TimeoutException()));
             }
@@ -302,6 +311,8 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             if (delay.isPresent()) {
                 // If there is new delay, schedule a timer
                 scheduleTimer(delay.get());
+            } else {
+                LOG.debug("{}: not scheduling timeout on {}", context.persistenceId(), this);
             }
         } finally {
             lock.unlock();
@@ -335,13 +346,14 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     @GuardedBy("lock")
     private Optional<Long> lockedCheckTimeout(final long now) {
         if (queue.isEmpty()) {
+            LOG.debug("{}: connection {} is empty", context.persistenceId(), this);
             return Optional.empty();
         }
 
         final long backendSilentTicks = backendSilentTicks(now);
         if (backendSilentTicks >= BACKEND_ALIVE_TIMEOUT_NANOS) {
-            LOG.debug("Connection {} has not seen activity from backend for {} nanoseconds, timing out", this,
-                backendSilentTicks);
+            LOG.debug("{}: Connection {} has not seen activity from backend for {} nanoseconds, timing out",
+                context.persistenceId(), this, backendSilentTicks);
             return null;
         }
 
@@ -354,7 +366,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
             tasksTimedOut++;
             queue.remove(now);
-            LOG.debug("Connection {} timed out entryt {}", this, head);
+            LOG.debug("{}: Connection {} timed out entry {}", context.persistenceId(), this, head);
             head.complete(head.getRequest().toRequestFailure(
                 new RequestTimeoutException("Timed out after " + beenOpen + "ns")));
         }
index 25e5d6e..58c9e75 100644 (file)
@@ -8,9 +8,6 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import com.google.common.base.Preconditions;
-import java.util.function.Consumer;
-import org.opendaylight.controller.cluster.access.concepts.Request;
-import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,12 +26,18 @@ public abstract class ReconnectForwarder {
         this.successor = Preconditions.checkNotNull(successor);
     }
 
-    protected final void sendToSuccessor(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
-        successor.sendRequest(request, callback);
+    protected final void sendToSuccessor(final ConnectionEntry entry) {
+        successor.sendRequest(entry.getRequest(), entry.getCallback());
+    }
+
+    protected final void replayToSuccessor(final ConnectionEntry entry) {
+        successor.enqueueRequest(entry.getRequest(), entry.getCallback(), entry.getEnqueuedTicks());
     }
 
     protected abstract void forwardEntry(ConnectionEntry entry, long now);
 
+    protected abstract void replayEntry(ConnectionEntry entry, long now);
+
     final AbstractReceivingClientConnection<?> successor() {
         return successor;
     }
index 2def9a1..90ec49e 100644 (file)
@@ -15,7 +15,12 @@ final class SimpleReconnectForwarder extends ReconnectForwarder {
 
     @Override
     protected void forwardEntry(final ConnectionEntry entry, final long now) {
-        // We are ignoring requested delay, as we have already paid the admission delay
+        successor().sendEntry(entry, now);
+    }
+
+    @Override
+    protected void replayEntry(final ConnectionEntry entry, final long now) {
+        // We are executing in the context of the client thread, do not block
         successor().enqueueEntry(entry, now);
     }
 }
index b754341..178a46c 100644 (file)
@@ -188,6 +188,7 @@ abstract class TransmitQueue {
      */
     final long enqueue(final ConnectionEntry entry, final long now) {
         if (successor != null) {
+            // This call will pay the enqueuing price, hence the caller does not have to
             successor.forwardEntry(entry, now);
             return 0;
         }
@@ -257,14 +258,14 @@ abstract class TransmitQueue {
         int count = 0;
         ConnectionEntry entry = inflight.poll();
         while (entry != null) {
-            successor.forwardEntry(entry, now);
+            successor.replayEntry(entry, now);
             entry = inflight.poll();
             count++;
         }
 
         entry = pending.poll();
         while (entry != null) {
-            successor.forwardEntry(entry, now);
+            successor.replayEntry(entry, now);
             entry = pending.poll();
             count++;
         }
index cb56864..1eff70f 100644 (file)
@@ -645,11 +645,11 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             for (Object obj : successfulRequests) {
                 if (obj instanceof TransactionRequest) {
                     LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
-                    successor.replayRequest((TransactionRequest<?>) obj, resp -> { }, now);
+                    successor.doReplayRequest((TransactionRequest<?>) obj, resp -> { }, now);
                 } else {
                     Verify.verify(obj instanceof IncrementSequence);
                     final IncrementSequence increment = (IncrementSequence) obj;
-                    successor.replayRequest(new IncrementTransactionSequenceRequest(getIdentifier(),
+                    successor.doReplayRequest(new IncrementTransactionSequenceRequest(getIdentifier(),
                         increment.getSequence(), localActor(), isSnapshotOnly(), increment.getDelta()), resp -> { },
                         now);
                     LOG.debug("Incrementing sequence {} to successor {}", obj, successor);
@@ -668,7 +668,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             if (getIdentifier().equals(req.getTarget())) {
                 Verify.verify(req instanceof TransactionRequest, "Unhandled request %s", req);
                 LOG.debug("Replaying queued request {} to successor {}", req, successor);
-                successor.replayRequest((TransactionRequest<?>) req, e.getCallback(), e.getEnqueuedTicks());
+                successor.doReplayRequest((TransactionRequest<?>) req, e.getCallback(), e.getEnqueuedTicks());
                 it.remove();
             }
         }
@@ -696,7 +696,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * @param callback Callback to be invoked once the request completes
      * @param enqueuedTicks ticker-based time stamp when the request was enqueued
      */
-    private void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+    private void doReplayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
             final long enqueuedTicks) {
         if (request instanceof AbstractLocalTransactionRequest) {
             handleReplayedLocalRequest((AbstractLocalTransactionRequest<?>) request, callback, enqueuedTicks);
@@ -736,6 +736,11 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         }
     }
 
+    final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+            final long enqueuedTicks) {
+        getSuccessorState().getSuccessor().doReplayRequest(request, callback, enqueuedTicks);
+    }
+
     abstract boolean isSnapshotOnly();
 
     abstract void doDelete(YangInstanceIdentifier path);
index a518c55..26e346e 100644 (file)
@@ -57,6 +57,23 @@ final class BouncingReconnectForwarder extends ReconnectForwarder {
 
     @Override
     protected void forwardEntry(final ConnectionEntry entry, final long now) {
+        try {
+            findCohort(entry).forwardEntry(entry, this::sendToSuccessor);
+        } catch (RequestException e) {
+            entry.complete(entry.getRequest().toRequestFailure(e));
+        }
+    }
+
+    @Override
+    protected void replayEntry(final ConnectionEntry entry, final long now) {
+        try {
+            findCohort(entry).replayEntry(entry, this::replayToSuccessor);
+        } catch (RequestException e) {
+            entry.complete(entry.getRequest().toRequestFailure(e));
+        }
+    }
+
+    private ProxyReconnectCohort findCohort(final ConnectionEntry entry) throws CohortNotFoundException {
         final Request<? , ?> request = entry.getRequest();
 
         final LocalHistoryIdentifier historyId;
@@ -68,16 +85,12 @@ final class BouncingReconnectForwarder extends ReconnectForwarder {
             throw new IllegalArgumentException("Unhandled request " + request);
         }
 
-        try {
-            final ProxyReconnectCohort cohort = cohorts.get(historyId);
-            if (cohort == null) {
-                LOG.warn("Cohort for request {} not found, aborting it", request);
-                throw new CohortNotFoundException(historyId);
-            }
-
-            cohort.forwardRequest(request, entry.getCallback(), this::sendToSuccessor);
-        } catch (RequestException e) {
-            entry.complete(request.toRequestFailure(e));
+        final ProxyReconnectCohort cohort = cohorts.get(historyId);
+        if (cohort == null) {
+            LOG.warn("Cohort for request {} not found, aborting it", request);
+            throw new CohortNotFoundException(historyId);
         }
+
+        return cohort;
     }
-}
\ No newline at end of file
+}
index e75a2df..e26e00f 100644 (file)
@@ -19,7 +19,6 @@ import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
@@ -272,22 +271,34 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
 
         @Override
-        void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
-                final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
-            // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
-            //        period required to get into the queue.
+        void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
+                throws RequestException {
+            final Request<?, ?> request = entry.getRequest();
             if (request instanceof TransactionRequest) {
-                forwardTransactionRequest((TransactionRequest<?>) request, callback);
+                lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
+                    entry.getEnqueuedTicks());
             } else if (request instanceof LocalHistoryRequest) {
-                forwardTo.accept(request, callback);
+                replayTo.accept(entry);
             } else {
                 throw new IllegalArgumentException("Unhandled request " + request);
             }
         }
 
-        private void forwardTransactionRequest(final TransactionRequest<?> request,
-                final Consumer<Response<?, ?>> callback) throws RequestException {
+        @Override
+        void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
+                throws RequestException {
+            final Request<?, ?> request = entry.getRequest();
+            if (request instanceof TransactionRequest) {
+                lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
+            } else if (request instanceof LocalHistoryRequest) {
+                forwardTo.accept(entry);
+            } else {
+                throw new IllegalArgumentException("Unhandled request " + request);
+            }
+        }
 
+        private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
+                throws RequestReplayException {
             final AbstractProxyTransaction proxy;
             lock.lock();
             try {
@@ -295,11 +306,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             } finally {
                 lock.unlock();
             }
-            if (proxy == null) {
-                throw new RequestReplayException("Failed to find proxy for %s", request);
+            if (proxy != null) {
+                return proxy;
             }
 
-            proxy.forwardRequest(request, callback);
+            throw new RequestReplayException("Failed to find proxy for %s", request);
         }
     }
 
index 14bf064..11e612c 100644 (file)
@@ -8,13 +8,10 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import java.util.Collection;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
-import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.yangtools.concepts.Identifiable;
 
 abstract class ProxyReconnectCohort implements Identifiable<LocalHistoryIdentifier> {
@@ -23,6 +20,7 @@ abstract class ProxyReconnectCohort implements Identifiable<LocalHistoryIdentifi
 
     abstract ProxyHistory finishReconnect();
 
-    abstract void forwardRequest(Request<?, ?> request, Consumer<Response<?, ?>> callback,
-            BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException;
+    abstract void replayEntry(ConnectionEntry entry, Consumer<ConnectionEntry> replayTo) throws RequestException;
+
+    abstract void forwardEntry(ConnectionEntry entry, Consumer<ConnectionEntry> forwardTo) throws RequestException;
 }