From f32b44f6e2dac23938a2c01638872c65ba1237f5 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 11 May 2017 16:54:22 +0200 Subject: [PATCH] BUG-8422: separate retry and request timeouts This patch corrects a thinko around request timeouts, where we reconnect the connection based on request timeout, not based on the 'try' timeout. The difference between the two is that the 'try' timeout is the period we allow the backend to respond to our request and when it does not, we reconnect the connection. Change-Id: I8c00a80e5c26c5b829056c43fe78a0567041bc5e Signed-off-by: Robert Varga Signed-off-by: Tomas Cere --- .../client/AbstractClientConnection.java | 89 +++++++++++++++---- .../access/client/ProgressTracker.java | 6 +- .../client/RequestTimeoutException.java | 23 +++++ .../cluster/access/client/TransmitQueue.java | 19 +++- .../client/AbstractClientConnectionTest.java | 9 +- .../ConnectingClientConnectionTest.java | 26 +++--- ...butedDataStoreRemotingIntegrationTest.java | 2 +- .../test/resources/simplelogger.properties | 1 + 8 files changed, 131 insertions(+), 44 deletions(-) create mode 100644 opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/RequestTimeoutException.java diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java index d37893bd7c..4dfe43b089 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java @@ -40,14 +40,31 @@ import scala.concurrent.duration.FiniteDuration; public abstract class AbstractClientConnection { private static final Logger LOG = LoggerFactory.getLogger(AbstractClientConnection.class); - // Keep these constants in nanoseconds, as that prevents unnecessary conversions in the fast path + /* + * Timers involved in communication with the backend. There are three tiers which are spaced out to allow for + * recovery at each tier. Keep these constants in nanoseconds, as that prevents unnecessary conversions in the fast + * path. + */ + /** + * Backend aliveness timer. This is reset whenever we receive a response from the backend and kept armed whenever + * we have an outstanding request. If when this time expires, we tear down this connection and attept to reconnect + * it. + */ @VisibleForTesting - static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15); + static final long BACKEND_ALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30); + + /** + * Request timeout. If the request fails to complete within this time since it was originally enqueued, we time + * the request out. + */ @VisibleForTesting - static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30); + static final long REQUEST_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(2); - private static final FiniteDuration REQUEST_TIMEOUT_DURATION = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS, - TimeUnit.NANOSECONDS); + /** + * No progress timeout. A client fails to make any forward progress in this time, it will terminate itself. + */ + @VisibleForTesting + static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15); private final Lock lock = new ReentrantLock(); private final ClientActorContext context; @@ -58,6 +75,11 @@ public abstract class AbstractClientConnection { @GuardedBy("lock") private boolean haveTimer; + /** + * Time reference when we saw any activity from the backend. + */ + private long lastReceivedTicks; + private volatile RequestException poisoned; // Do not allow subclassing outside of this package @@ -66,6 +88,7 @@ public abstract class AbstractClientConnection { this.context = Preconditions.checkNotNull(context); this.cookie = Preconditions.checkNotNull(cookie); this.queue = Preconditions.checkNotNull(queue); + this.lastReceivedTicks = currentTime(); } // Do not allow subclassing outside of this package @@ -73,6 +96,7 @@ public abstract class AbstractClientConnection { this.context = oldConnection.context; this.cookie = oldConnection.cookie; this.queue = new TransmitQueue.Halted(targetQueueSize); + this.lastReceivedTicks = oldConnection.lastReceivedTicks; } public final ClientActorContext context() { @@ -159,8 +183,8 @@ public abstract class AbstractClientConnection { } if (queue.isEmpty()) { - // The queue is becoming non-empty, schedule a timer - scheduleTimer(REQUEST_TIMEOUT_DURATION); + // The queue is becoming non-empty, schedule a timer. + scheduleTimer(entry.getEnqueuedTicks() + REQUEST_TIMEOUT_NANOS - now); } return queue.enqueue(entry, now); } finally { @@ -183,7 +207,7 @@ public abstract class AbstractClientConnection { * @param delay Delay, in nanoseconds */ @GuardedBy("lock") - private void scheduleTimer(final FiniteDuration delay) { + private void scheduleTimer(final long delay) { if (haveTimer) { LOG.debug("{}: timer already scheduled", context.persistenceId()); return; @@ -192,8 +216,14 @@ public abstract class AbstractClientConnection { LOG.debug("{}: connection has successor, not scheduling timer", context.persistenceId()); return; } - LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), delay); - context.executeInActor(this::runTimer, delay); + + // If the delay is negative, we need to schedule an action immediately. While the caller could have checked + // for that condition and take appropriate action, but this is more convenient and less error-prone. + final long normalized = delay <= 0 ? 0 : Math.min(delay, BACKEND_ALIVE_TIMEOUT_NANOS); + + final FiniteDuration dur = FiniteDuration.fromNanos(normalized); + LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), dur); + context.executeInActor(this::runTimer, dur); haveTimer = true; } @@ -206,13 +236,14 @@ public abstract class AbstractClientConnection { */ @VisibleForTesting final ClientActorBehavior runTimer(final ClientActorBehavior current) { - final Optional delay; + final Optional delay; lock.lock(); try { haveTimer = false; final long now = currentTime(); // The following line is only reliable when queue is not forwarding, but such state should not last long. + // FIXME: BUG-8422: this may not be accurate w.r.t. replayed entries final long ticksSinceProgress = queue.ticksStalling(now); if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) { LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this, @@ -244,7 +275,7 @@ public abstract class AbstractClientConnection { } @VisibleForTesting - final Optional checkTimeout(final long now) { + final Optional checkTimeout(final long now) { lock.lock(); try { return lockedCheckTimeout(now); @@ -262,19 +293,38 @@ public abstract class AbstractClientConnection { @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL", justification = "Returning null Optional is documented in the API contract.") @GuardedBy("lock") - private Optional lockedCheckTimeout(final long now) { - final ConnectionEntry head = queue.peek(); - if (head == null) { + private Optional lockedCheckTimeout(final long now) { + if (queue.isEmpty()) { return Optional.empty(); } - final long beenOpen = now - head.getEnqueuedTicks(); - if (beenOpen >= REQUEST_TIMEOUT_NANOS) { - LOG.debug("Connection {} has a request not completed for {} nanoseconds, timing out", this, beenOpen); + final long backendSilentTicks = now - lastReceivedTicks; + if (backendSilentTicks >= BACKEND_ALIVE_TIMEOUT_NANOS) { + LOG.debug("Connection {} has not seen activity from backend for {} nanoseconds, timing out", this, + backendSilentTicks); return null; } - return Optional.of(FiniteDuration.apply(REQUEST_TIMEOUT_NANOS - beenOpen, TimeUnit.NANOSECONDS)); + int tasksTimedOut = 0; + for (ConnectionEntry head = queue.peek(); head != null; head = queue.peek()) { + final long beenOpen = now - head.getEnqueuedTicks(); + if (beenOpen < REQUEST_TIMEOUT_NANOS) { + return Optional.of(REQUEST_TIMEOUT_NANOS - beenOpen); + } + + tasksTimedOut++; + queue.remove(now); + LOG.debug("Connection {} timed out entryt {}", this, head); + head.complete(head.getRequest().toRequestFailure( + new RequestTimeoutException("Timed out after " + beenOpen + "ns"))); + } + + LOG.debug("Connection {} timed out {} tasks", this, tasksTimedOut); + if (tasksTimedOut != 0) { + queue.tryTransmit(now); + } + + return Optional.empty(); } final void poison(final RequestException cause) { @@ -299,6 +349,7 @@ public abstract class AbstractClientConnection { final void receiveResponse(final ResponseEnvelope envelope) { final long now = currentTime(); + lastReceivedTicks = now; final Optional maybeEntry; lock.lock(); diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java index 2a24077f6e..699c102297 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java @@ -272,9 +272,9 @@ abstract class ProgressTracker { * This call can empty the collection of open tasks, that special case should be handled. * * @param now tick number corresponding to caller's present - * @param enqueuedTicks see TransitQueue#recordCompletion - * @param transmitTicks see TransitQueue#recordCompletion - * @param execNanos see TransitQueue#recordCompletion + * @param enqueuedTicks see TransmitQueue#recordCompletion + * @param transmitTicks see TransmitQueue#recordCompletion + * @param execNanos see TransmitQueue#recordCompletion */ protected void protectedCloseTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) { diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/RequestTimeoutException.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/RequestTimeoutException.java new file mode 100644 index 0000000000..0ddbc0721a --- /dev/null +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/RequestTimeoutException.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.client; + +import org.opendaylight.controller.cluster.access.concepts.RequestException; + +public final class RequestTimeoutException extends RequestException { + private static final long serialVersionUID = 1L; + + public RequestTimeoutException(final String message) { + super(message); + } + + @Override + public boolean isRetriable() { + return false; + } +} diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index 15ad958304..b2497fc7e7 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -135,12 +135,16 @@ abstract class TransmitQueue { tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos()); // We have freed up a slot, try to transmit something + tryTransmit(now); + + return Optional.of(entry); + } + + final void tryTransmit(final long now) { final int toSend = canTransmitCount(inflight.size()); if (toSend > 0 && !pending.isEmpty()) { transmitEntries(toSend, now); } - - return Optional.of(entry); } private void transmitEntries(final int maxTransmit, final long now) { @@ -246,6 +250,16 @@ abstract class TransmitQueue { } } + final void remove(final long now) { + final TransmittedConnectionEntry txe = inflight.poll(); + if (txe == null) { + final ConnectionEntry entry = pending.pop(); + tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0); + } else { + tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0); + } + } + @VisibleForTesting Deque getInflight() { return inflight; @@ -318,5 +332,4 @@ abstract class TransmitQueue { } queue.clear(); } - } diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java index 550dd9fa51..6a833a8dd0 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnectionTest.java @@ -43,7 +43,6 @@ import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import scala.concurrent.duration.FiniteDuration; public abstract class AbstractClientConnectionTest, U extends BackendInfo> { @@ -115,7 +114,7 @@ public abstract class AbstractClientConnectionTest timeout = connection.checkTimeout(context.ticker().read()); + final Optional timeout = connection.checkTimeout(context.ticker().read()); Assert.assertFalse(timeout.isPresent()); } @@ -123,8 +122,8 @@ public abstract class AbstractClientConnectionTest> callback = mock(Consumer.class); connection.sendRequest(createRequest(replyToProbe.ref()), callback); - final long now = context.ticker().read() + ConnectedClientConnection.REQUEST_TIMEOUT_NANOS; - final Optional timeout = connection.checkTimeout(now); + final long now = context.ticker().read() + ConnectedClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS; + final Optional timeout = connection.checkTimeout(now); Assert.assertNull(timeout); } @@ -133,7 +132,7 @@ public abstract class AbstractClientConnectionTest> callback = mock(Consumer.class); connection.sendRequest(createRequest(replyToProbe.ref()), callback); final long now = context.ticker().read(); - final Optional timeout = connection.checkTimeout(now); + final Optional timeout = connection.checkTimeout(now); Assert.assertTrue(timeout.isPresent()); } diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java index 8274d24ce2..b43642955b 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java @@ -191,7 +191,7 @@ public class ConnectingClientConnectionTest { @Test public void testSendRequestNeedsBackend() { queue.sendRequest(mockRequest, mockCallback); - final Optional ret = queue.checkTimeout(ticker.read()); + final Optional ret = queue.checkTimeout(ticker.read()); assertNotNull(ret); assertTrue(ret.isPresent()); } @@ -207,7 +207,7 @@ public class ConnectingClientConnectionTest { setupBackend(); queue.sendRequest(mockRequest, mockCallback); - final Optional ret = queue.checkTimeout(ticker.read()); + final Optional ret = queue.checkTimeout(ticker.read()); assertNotNull(ret); assertTrue(ret.isPresent()); assertTransmit(mockRequest, 0); @@ -215,7 +215,7 @@ public class ConnectingClientConnectionTest { @Test public void testRunTimeoutEmpty() throws NoProgressException { - Optional ret = queue.checkTimeout(ticker.read()); + Optional ret = queue.checkTimeout(ticker.read()); assertNotNull(ret); assertFalse(ret.isPresent()); } @@ -223,7 +223,7 @@ public class ConnectingClientConnectionTest { @Test public void testRunTimeoutWithoutShift() throws NoProgressException { queue.sendRequest(mockRequest, mockCallback); - Optional ret = queue.checkTimeout(ticker.read()); + Optional ret = queue.checkTimeout(ticker.read()); assertNotNull(ret); assertTrue(ret.isPresent()); } @@ -232,9 +232,9 @@ public class ConnectingClientConnectionTest { public void testRunTimeoutWithTimeoutLess() throws NoProgressException { queue.sendRequest(mockRequest, mockCallback); - ticker.advance(AbstractClientConnection.REQUEST_TIMEOUT_NANOS - 1); + ticker.advance(AbstractClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS - 1); - Optional ret = queue.checkTimeout(ticker.read()); + Optional ret = queue.checkTimeout(ticker.read()); assertNotNull(ret); assertTrue(ret.isPresent()); } @@ -245,9 +245,9 @@ public class ConnectingClientConnectionTest { queue.sendRequest(mockRequest, mockCallback); - ticker.advance(AbstractClientConnection.REQUEST_TIMEOUT_NANOS); + ticker.advance(AbstractClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS); - Optional ret = queue.checkTimeout(ticker.read()); + Optional ret = queue.checkTimeout(ticker.read()); assertNull(ret); } @@ -257,9 +257,9 @@ public class ConnectingClientConnectionTest { queue.sendRequest(mockRequest, mockCallback); - ticker.advance(AbstractClientConnection.REQUEST_TIMEOUT_NANOS + 1); + ticker.advance(AbstractClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS + 1); - Optional ret = queue.checkTimeout(ticker.read()); + Optional ret = queue.checkTimeout(ticker.read()); assertNull(ret); } @@ -290,7 +290,7 @@ public class ConnectingClientConnectionTest { ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS); // No problem - Optional ret = queue.checkTimeout(ticker.read()); + Optional ret = queue.checkTimeout(ticker.read()); assertNotNull(ret); assertFalse(ret.isPresent()); } @@ -300,7 +300,7 @@ public class ConnectingClientConnectionTest { ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS + 1); // No problem - Optional ret = queue.checkTimeout(ticker.read()); + Optional ret = queue.checkTimeout(ticker.read()); assertNotNull(ret); assertFalse(ret.isPresent()); } @@ -348,7 +348,7 @@ public class ConnectingClientConnectionTest { ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS - 11); - Optional ret = queue.checkTimeout(ticker.read()); + Optional ret = queue.checkTimeout(ticker.read()); assertNull(ret); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 561c0e19b3..dc5c2d422f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -118,7 +118,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Parameters(name = "{0}") public static Collection data() { return Arrays.asList(new Object[][] { - { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 60 } + { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 120 } }); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties index 6cf580c917..a9f623b036 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties @@ -9,3 +9,4 @@ org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.databroker.actors.dds=debug org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.sharding=debug +org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.access.client=debug -- 2.36.6