BUG-8422: separate retry and request timeouts 75/57375/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 11 May 2017 14:54:22 +0000 (16:54 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 18 May 2017 15:32:18 +0000 (17:32 +0200)
This patch corrects a thinko around request timeouts, where we
reconnect the connection based on request timeout, not based on
the 'try' timeout.

The difference between the two is that the 'try' timeout is the
period we allow the backend to respond to our request and when
it does not, we reconnect the connection.

Change-Id: I8c00a80e5c26c5b829056c43fe78a0567041bc5e
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
Signed-off-by: Tomas Cere <tcere@cisco.com>
(cherry picked from commit f32b44f6e2dac23938a2c01638872c65ba1237f5)

opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/RequestTimeoutException.java [new file with mode: 0644]
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties

index d37893b..4dfe43b 100644 (file)
@@ -40,14 +40,31 @@ import scala.concurrent.duration.FiniteDuration;
 public abstract class AbstractClientConnection<T extends BackendInfo> {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientConnection.class);
 
-    // Keep these constants in nanoseconds, as that prevents unnecessary conversions in the fast path
+    /*
+     * Timers involved in communication with the backend. There are three tiers which are spaced out to allow for
+     * recovery at each tier. Keep these constants in nanoseconds, as that prevents unnecessary conversions in the fast
+     * path.
+     */
+    /**
+     * Backend aliveness timer. This is reset whenever we receive a response from the backend and kept armed whenever
+     * we have an outstanding request. If when this time expires, we tear down this connection and attept to reconnect
+     * it.
+     */
     @VisibleForTesting
-    static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
+    static final long BACKEND_ALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
+
+    /**
+     * Request timeout. If the request fails to complete within this time since it was originally enqueued, we time
+     * the request out.
+     */
     @VisibleForTesting
-    static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
+    static final long REQUEST_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(2);
 
-    private static final FiniteDuration REQUEST_TIMEOUT_DURATION = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
-        TimeUnit.NANOSECONDS);
+    /**
+     * No progress timeout. A client fails to make any forward progress in this time, it will terminate itself.
+     */
+    @VisibleForTesting
+    static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
 
     private final Lock lock = new ReentrantLock();
     private final ClientActorContext context;
@@ -58,6 +75,11 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     @GuardedBy("lock")
     private boolean haveTimer;
 
+    /**
+     * Time reference when we saw any activity from the backend.
+     */
+    private long lastReceivedTicks;
+
     private volatile RequestException poisoned;
 
     // Do not allow subclassing outside of this package
@@ -66,6 +88,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         this.context = Preconditions.checkNotNull(context);
         this.cookie = Preconditions.checkNotNull(cookie);
         this.queue = Preconditions.checkNotNull(queue);
+        this.lastReceivedTicks = currentTime();
     }
 
     // Do not allow subclassing outside of this package
@@ -73,6 +96,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         this.context = oldConnection.context;
         this.cookie = oldConnection.cookie;
         this.queue = new TransmitQueue.Halted(targetQueueSize);
+        this.lastReceivedTicks = oldConnection.lastReceivedTicks;
     }
 
     public final ClientActorContext context() {
@@ -159,8 +183,8 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             }
 
             if (queue.isEmpty()) {
-                // The queue is becoming non-empty, schedule a timer
-                scheduleTimer(REQUEST_TIMEOUT_DURATION);
+                // The queue is becoming non-empty, schedule a timer.
+                scheduleTimer(entry.getEnqueuedTicks() + REQUEST_TIMEOUT_NANOS - now);
             }
             return queue.enqueue(entry, now);
         } finally {
@@ -183,7 +207,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      * @param delay Delay, in nanoseconds
      */
     @GuardedBy("lock")
-    private void scheduleTimer(final FiniteDuration delay) {
+    private void scheduleTimer(final long delay) {
         if (haveTimer) {
             LOG.debug("{}: timer already scheduled", context.persistenceId());
             return;
@@ -192,8 +216,14 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             LOG.debug("{}: connection has successor, not scheduling timer", context.persistenceId());
             return;
         }
-        LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), delay);
-        context.executeInActor(this::runTimer, delay);
+
+        // If the delay is negative, we need to schedule an action immediately. While the caller could have checked
+        // for that condition and take appropriate action, but this is more convenient and less error-prone.
+        final long normalized =  delay <= 0 ? 0 : Math.min(delay, BACKEND_ALIVE_TIMEOUT_NANOS);
+
+        final FiniteDuration dur = FiniteDuration.fromNanos(normalized);
+        LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), dur);
+        context.executeInActor(this::runTimer, dur);
         haveTimer = true;
     }
 
@@ -206,13 +236,14 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      */
     @VisibleForTesting
     final ClientActorBehavior<T> runTimer(final ClientActorBehavior<T> current) {
-        final Optional<FiniteDuration> delay;
+        final Optional<Long> delay;
 
         lock.lock();
         try {
             haveTimer = false;
             final long now = currentTime();
             // The following line is only reliable when queue is not forwarding, but such state should not last long.
+            // FIXME: BUG-8422: this may not be accurate w.r.t. replayed entries
             final long ticksSinceProgress = queue.ticksStalling(now);
             if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
                 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
@@ -244,7 +275,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     }
 
     @VisibleForTesting
-    final Optional<FiniteDuration> checkTimeout(final long now) {
+    final Optional<Long> checkTimeout(final long now) {
         lock.lock();
         try {
             return lockedCheckTimeout(now);
@@ -262,19 +293,38 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
     @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
             justification = "Returning null Optional is documented in the API contract.")
     @GuardedBy("lock")
-    private Optional<FiniteDuration> lockedCheckTimeout(final long now) {
-        final ConnectionEntry head = queue.peek();
-        if (head == null) {
+    private Optional<Long> lockedCheckTimeout(final long now) {
+        if (queue.isEmpty()) {
             return Optional.empty();
         }
 
-        final long beenOpen = now - head.getEnqueuedTicks();
-        if (beenOpen >= REQUEST_TIMEOUT_NANOS) {
-            LOG.debug("Connection {} has a request not completed for {} nanoseconds, timing out", this, beenOpen);
+        final long backendSilentTicks = now - lastReceivedTicks;
+        if (backendSilentTicks >= BACKEND_ALIVE_TIMEOUT_NANOS) {
+            LOG.debug("Connection {} has not seen activity from backend for {} nanoseconds, timing out", this,
+                backendSilentTicks);
             return null;
         }
 
-        return Optional.of(FiniteDuration.apply(REQUEST_TIMEOUT_NANOS - beenOpen, TimeUnit.NANOSECONDS));
+        int tasksTimedOut = 0;
+        for (ConnectionEntry head = queue.peek(); head != null; head = queue.peek()) {
+            final long beenOpen = now - head.getEnqueuedTicks();
+            if (beenOpen < REQUEST_TIMEOUT_NANOS) {
+                return Optional.of(REQUEST_TIMEOUT_NANOS - beenOpen);
+            }
+
+            tasksTimedOut++;
+            queue.remove(now);
+            LOG.debug("Connection {} timed out entryt {}", this, head);
+            head.complete(head.getRequest().toRequestFailure(
+                new RequestTimeoutException("Timed out after " + beenOpen + "ns")));
+        }
+
+        LOG.debug("Connection {} timed out {} tasks", this, tasksTimedOut);
+        if (tasksTimedOut != 0) {
+            queue.tryTransmit(now);
+        }
+
+        return Optional.empty();
     }
 
     final void poison(final RequestException cause) {
@@ -299,6 +349,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
     final void receiveResponse(final ResponseEnvelope<?> envelope) {
         final long now = currentTime();
+        lastReceivedTicks = now;
 
         final Optional<TransmittedConnectionEntry> maybeEntry;
         lock.lock();
index 2a24077..699c102 100644 (file)
@@ -272,9 +272,9 @@ abstract class ProgressTracker {
      * This call can empty the collection of open tasks, that special case should be handled.
      *
      * @param now tick number corresponding to caller's present
-     * @param enqueuedTicks see TransitQueue#recordCompletion
-     * @param transmitTicks see TransitQueue#recordCompletion
-     * @param execNanos see TransitQueue#recordCompletion
+     * @param enqueuedTicks see TransmitQueue#recordCompletion
+     * @param transmitTicks see TransmitQueue#recordCompletion
+     * @param execNanos see TransmitQueue#recordCompletion
      */
     protected void protectedCloseTask(final long now, final long enqueuedTicks, final long transmitTicks,
                 final long execNanos) {
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/RequestTimeoutException.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/RequestTimeoutException.java
new file mode 100644 (file)
index 0000000..0ddbc07
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.client;
+
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+public final class RequestTimeoutException extends RequestException {
+    private static final long serialVersionUID = 1L;
+
+    public RequestTimeoutException(final String message) {
+        super(message);
+    }
+
+    @Override
+    public boolean isRetriable() {
+        return false;
+    }
+}
index 15ad958..b2497fc 100644 (file)
@@ -135,12 +135,16 @@ abstract class TransmitQueue {
         tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
 
         // We have freed up a slot, try to transmit something
+        tryTransmit(now);
+
+        return Optional.of(entry);
+    }
+
+    final void tryTransmit(final long now) {
         final int toSend = canTransmitCount(inflight.size());
         if (toSend > 0 && !pending.isEmpty()) {
             transmitEntries(toSend, now);
         }
-
-        return Optional.of(entry);
     }
 
     private void transmitEntries(final int maxTransmit, final long now) {
@@ -246,6 +250,16 @@ abstract class TransmitQueue {
         }
     }
 
+    final void remove(final long now) {
+        final TransmittedConnectionEntry txe = inflight.poll();
+        if (txe == null) {
+            final ConnectionEntry entry = pending.pop();
+            tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0);
+        } else {
+            tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0);
+        }
+    }
+
     @VisibleForTesting
     Deque<TransmittedConnectionEntry> getInflight() {
         return inflight;
@@ -318,5 +332,4 @@ abstract class TransmitQueue {
         }
         queue.clear();
     }
-
 }
index 550dd9f..6a833a8 100644 (file)
@@ -43,7 +43,6 @@ import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import scala.concurrent.duration.FiniteDuration;
 
 public abstract class AbstractClientConnectionTest<T extends AbstractClientConnection<U>, U extends BackendInfo> {
 
@@ -115,7 +114,7 @@ public abstract class AbstractClientConnectionTest<T extends AbstractClientConne
 
     @Test
     public void testCheckTimeoutEmptyQueue() throws Exception {
-        final Optional<FiniteDuration> timeout = connection.checkTimeout(context.ticker().read());
+        final Optional<Long> timeout = connection.checkTimeout(context.ticker().read());
         Assert.assertFalse(timeout.isPresent());
     }
 
@@ -123,8 +122,8 @@ public abstract class AbstractClientConnectionTest<T extends AbstractClientConne
     public void testCheckTimeoutConnectionTimeouted() throws Exception {
         final Consumer<Response<?, ?>> callback = mock(Consumer.class);
         connection.sendRequest(createRequest(replyToProbe.ref()), callback);
-        final long now = context.ticker().read() + ConnectedClientConnection.REQUEST_TIMEOUT_NANOS;
-        final Optional<FiniteDuration> timeout = connection.checkTimeout(now);
+        final long now = context.ticker().read() + ConnectedClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS;
+        final Optional<Long> timeout = connection.checkTimeout(now);
         Assert.assertNull(timeout);
     }
 
@@ -133,7 +132,7 @@ public abstract class AbstractClientConnectionTest<T extends AbstractClientConne
         final Consumer<Response<?, ?>> callback = mock(Consumer.class);
         connection.sendRequest(createRequest(replyToProbe.ref()), callback);
         final long now = context.ticker().read();
-        final Optional<FiniteDuration> timeout = connection.checkTimeout(now);
+        final Optional<Long> timeout = connection.checkTimeout(now);
         Assert.assertTrue(timeout.isPresent());
     }
 
index 8274d24..b436429 100644 (file)
@@ -191,7 +191,7 @@ public class ConnectingClientConnectionTest {
     @Test
     public void testSendRequestNeedsBackend() {
         queue.sendRequest(mockRequest, mockCallback);
-        final Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        final Optional<Long> ret = queue.checkTimeout(ticker.read());
         assertNotNull(ret);
         assertTrue(ret.isPresent());
     }
@@ -207,7 +207,7 @@ public class ConnectingClientConnectionTest {
         setupBackend();
 
         queue.sendRequest(mockRequest, mockCallback);
-        final Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        final Optional<Long> ret = queue.checkTimeout(ticker.read());
         assertNotNull(ret);
         assertTrue(ret.isPresent());
         assertTransmit(mockRequest, 0);
@@ -215,7 +215,7 @@ public class ConnectingClientConnectionTest {
 
     @Test
     public void testRunTimeoutEmpty() throws NoProgressException {
-        Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        Optional<Long> ret = queue.checkTimeout(ticker.read());
         assertNotNull(ret);
         assertFalse(ret.isPresent());
     }
@@ -223,7 +223,7 @@ public class ConnectingClientConnectionTest {
     @Test
     public void testRunTimeoutWithoutShift() throws NoProgressException {
         queue.sendRequest(mockRequest, mockCallback);
-        Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        Optional<Long> ret = queue.checkTimeout(ticker.read());
         assertNotNull(ret);
         assertTrue(ret.isPresent());
     }
@@ -232,9 +232,9 @@ public class ConnectingClientConnectionTest {
     public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
         queue.sendRequest(mockRequest, mockCallback);
 
-        ticker.advance(AbstractClientConnection.REQUEST_TIMEOUT_NANOS - 1);
+        ticker.advance(AbstractClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS - 1);
 
-        Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        Optional<Long> ret = queue.checkTimeout(ticker.read());
         assertNotNull(ret);
         assertTrue(ret.isPresent());
     }
@@ -245,9 +245,9 @@ public class ConnectingClientConnectionTest {
 
         queue.sendRequest(mockRequest, mockCallback);
 
-        ticker.advance(AbstractClientConnection.REQUEST_TIMEOUT_NANOS);
+        ticker.advance(AbstractClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS);
 
-        Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        Optional<Long> ret = queue.checkTimeout(ticker.read());
         assertNull(ret);
     }
 
@@ -257,9 +257,9 @@ public class ConnectingClientConnectionTest {
 
         queue.sendRequest(mockRequest, mockCallback);
 
-        ticker.advance(AbstractClientConnection.REQUEST_TIMEOUT_NANOS + 1);
+        ticker.advance(AbstractClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS + 1);
 
-        Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        Optional<Long> ret = queue.checkTimeout(ticker.read());
         assertNull(ret);
     }
 
@@ -290,7 +290,7 @@ public class ConnectingClientConnectionTest {
         ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS);
 
         // No problem
-        Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        Optional<Long> ret = queue.checkTimeout(ticker.read());
         assertNotNull(ret);
         assertFalse(ret.isPresent());
     }
@@ -300,7 +300,7 @@ public class ConnectingClientConnectionTest {
         ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS + 1);
 
         // No problem
-        Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        Optional<Long> ret = queue.checkTimeout(ticker.read());
         assertNotNull(ret);
         assertFalse(ret.isPresent());
     }
@@ -348,7 +348,7 @@ public class ConnectingClientConnectionTest {
 
         ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS - 11);
 
-        Optional<FiniteDuration> ret = queue.checkTimeout(ticker.read());
+        Optional<Long> ret = queue.checkTimeout(ticker.read());
         assertNull(ret);
     }
 
index c52e4a3..f58fd0d 100644 (file)
@@ -118,7 +118,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     @Parameters(name = "{0}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 60 }
+                { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 120 }
         });
     }
 
index 6cf580c..a9f623b 100644 (file)
@@ -9,3 +9,4 @@ org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug
 org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.databroker.actors.dds=debug
 org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off
 org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.sharding=debug
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.access.client=debug

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.