Make AbstractClientConnection timeouts configurable 87/60087/4
authorTom Pantelis <tompantelis@gmail.com>
Fri, 7 Jul 2017 14:59:40 +0000 (10:59 -0400)
committerRobert Varga <nite@hq.sk>
Mon, 10 Jul 2017 08:33:52 +0000 (08:33 +0000)
So we can tweak them in production and unit tests.

Change-Id: I39ce8cdf3cd5397a71f52c42357943dfe5eccb7c
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/AbstractClientConnection.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorConfig.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/AccessClientUtil.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ActorBehaviorTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ReconnectingClientConnectionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang

index b55e022..5f23557 100644 (file)
@@ -50,24 +50,21 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
      */
     /**
      * Backend aliveness timer. This is reset whenever we receive a response from the backend and kept armed whenever
-     * we have an outstanding request. If when this time expires, we tear down this connection and attept to reconnect
+     * we have an outstanding request. If when this time expires, we tear down this connection and attempt to reconnect
      * it.
      */
-    @VisibleForTesting
-    static final long BACKEND_ALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
+    public static final long DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
 
     /**
      * Request timeout. If the request fails to complete within this time since it was originally enqueued, we time
      * the request out.
      */
-    @VisibleForTesting
-    static final long REQUEST_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(2);
+    public static final long DEFAULT_REQUEST_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(2);
 
     /**
      * No progress timeout. A client fails to make any forward progress in this time, it will terminate itself.
      */
-    @VisibleForTesting
-    static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
+    public static final long DEFAULT_NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
 
     // Emit a debug entry if we sleep for more that this amount
     private static final long DEBUG_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
@@ -167,7 +164,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
             if (queue.isEmpty()) {
                 // The queue is becoming non-empty, schedule a timer.
-                scheduleTimer(entry.getEnqueuedTicks() + REQUEST_TIMEOUT_NANOS - now);
+                scheduleTimer(entry.getEnqueuedTicks() + context.config().getRequestTimeout() - now);
             }
             return queue.enqueue(entry, now);
         } finally {
@@ -259,7 +256,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
         // If the delay is negative, we need to schedule an action immediately. While the caller could have checked
         // for that condition and take appropriate action, but this is more convenient and less error-prone.
-        final long normalized =  delay <= 0 ? 0 : Math.min(delay, BACKEND_ALIVE_TIMEOUT_NANOS);
+        final long normalized =  delay <= 0 ? 0 : Math.min(delay, context.config().getBackendAlivenessTimerInterval());
 
         final FiniteDuration dur = FiniteDuration.fromNanos(normalized);
         LOG.debug("{}: connection {} scheduling timeout in {}", context.persistenceId(), this, dur);
@@ -269,7 +266,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
 
     /**
      * Check this queue for timeout and initiate reconnection if that happened. If the queue has not made progress
-     * in {@link #NO_PROGRESS_TIMEOUT_NANOS} nanoseconds, it will be aborted.
+     * in {@link #DEFAULT_NO_PROGRESS_TIMEOUT_NANOS} nanoseconds, it will be aborted.
      *
      * @param current Current behavior
      * @return Next behavior to use
@@ -288,7 +285,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
             // The following line is only reliable when queue is not forwarding, but such state should not last long.
             // FIXME: BUG-8422: this may not be accurate w.r.t. replayed entries
             final long ticksSinceProgress = queue.ticksStalling(now);
-            if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
+            if (ticksSinceProgress >= context.config().getNoProgressTimeout()) {
                 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
                     TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
 
@@ -351,7 +348,7 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         }
 
         final long backendSilentTicks = backendSilentTicks(now);
-        if (backendSilentTicks >= BACKEND_ALIVE_TIMEOUT_NANOS) {
+        if (backendSilentTicks >= context.config().getBackendAlivenessTimerInterval()) {
             LOG.debug("{}: Connection {} has not seen activity from backend for {} nanoseconds, timing out",
                 context.persistenceId(), this, backendSilentTicks);
             return null;
@@ -360,8 +357,9 @@ public abstract class AbstractClientConnection<T extends BackendInfo> {
         int tasksTimedOut = 0;
         for (ConnectionEntry head = queue.peek(); head != null; head = queue.peek()) {
             final long beenOpen = now - head.getEnqueuedTicks();
-            if (beenOpen < REQUEST_TIMEOUT_NANOS) {
-                return Optional.of(REQUEST_TIMEOUT_NANOS - beenOpen);
+            final long requestTimeout = context.config().getRequestTimeout();
+            if (beenOpen < requestTimeout) {
+                return Optional.of(requestTimeout - beenOpen);
             }
 
             tasksTimedOut++;
index 460420a..6e7df7e 100644 (file)
@@ -34,4 +34,27 @@ public interface ClientActorConfig {
      * @return the directory name
      */
     String getTempFileDirectory();
+
+    /**
+     * Returns the timer interval whereby, on expiration after response inactivity from the back-end, the connection to
+     * the back-end is torn down and reconnection is attempted.
+
+     * @return timer interval in nanoseconds.
+     */
+    long getBackendAlivenessTimerInterval();
+
+    /**
+     * Returns the timeout interval whereby requests are failed.
+     *
+     * @return the timeout interval in nanoseconds.
+     */
+    long getRequestTimeout();
+
+    /**
+     * Returns the timeout interval whereby the client front-end hasn't made progress with the back-end on any request
+     * and terminates.
+     *
+     * @return the timeout interval in nanoseconds.
+     */
+    long getNoProgressTimeout();
 }
index 6fdc007..586fb59 100644 (file)
@@ -35,6 +35,11 @@ public class AccessClientUtil {
         ClientActorConfig mockConfig = mock(ClientActorConfig.class);
         doReturn(2_000_000).when(mockConfig).getMaximumMessageSliceSize();
         doReturn(1_000_000_000).when(mockConfig).getFileBackedStreamingThreshold();
+        doReturn(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS)
+                .when(mockConfig).getBackendAlivenessTimerInterval();
+        doReturn(AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS).when(mockConfig).getRequestTimeout();
+        doReturn(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS)
+                 .when(mockConfig).getNoProgressTimeout();
         return mockConfig;
     }
 
index 6804b1f..513253e 100644 (file)
@@ -174,6 +174,7 @@ public class ActorBehaviorTest {
     private static class MockedActor extends AbstractClientActor {
 
         private final ClientActorBehavior<?> initialBehavior;
+        private final ClientActorConfig mockConfig = AccessClientUtil.newMockClientActorConfig();
 
         private static Props props(final FrontendIdentifier frontendId, final ClientActorBehavior<?> initialBehavior) {
             return Props.create(MockedActor.class, () -> new MockedActor(frontendId, initialBehavior));
@@ -191,22 +192,7 @@ public class ActorBehaviorTest {
 
         @Override
         protected ClientActorConfig getClientActorConfig() {
-            return new ClientActorConfig() {
-                @Override
-                public String getTempFileDirectory() {
-                    return null;
-                }
-
-                @Override
-                public int getMaximumMessageSliceSize() {
-                    return 2000000;
-                }
-
-                @Override
-                public int getFileBackedStreamingThreshold() {
-                    return 1000000000;
-                }
-            };
+            return mockConfig;
         }
     }
 
index 29cac23..0f30806 100644 (file)
@@ -27,7 +27,7 @@ public class ConnectedClientConnectionTest
     public void testCheckTimeoutConnectionTimedout() throws Exception {
         final Consumer<Response<?, ?>> callback = mock(Consumer.class);
         connection.sendRequest(createRequest(replyToProbe.ref()), callback);
-        final long now = context.ticker().read() + ConnectedClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS;
+        final long now = context.ticker().read() + ConnectedClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
         final Optional<Long> timeout = connection.checkTimeout(now);
         Assert.assertNull(timeout);
     }
index b436429..7dd609e 100644 (file)
@@ -142,6 +142,9 @@ public class ConnectingClientConnectionTest {
         ticker.advance(ThreadLocalRandom.current().nextLong());
         doReturn(ticker).when(mockContext).ticker();
 
+        final ClientActorConfig mockConfig = AccessClientUtil.newMockClientActorConfig();
+        doReturn(mockConfig).when(mockContext).config();
+
         mockActor = TestProbe.apply(actorSystem);
         mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
         mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
@@ -232,7 +235,7 @@ public class ConnectingClientConnectionTest {
     public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
         queue.sendRequest(mockRequest, mockCallback);
 
-        ticker.advance(AbstractClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS - 1);
+        ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS - 1);
 
         Optional<Long> ret = queue.checkTimeout(ticker.read());
         assertNotNull(ret);
@@ -245,7 +248,7 @@ public class ConnectingClientConnectionTest {
 
         queue.sendRequest(mockRequest, mockCallback);
 
-        ticker.advance(AbstractClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS);
+        ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS);
 
         Optional<Long> ret = queue.checkTimeout(ticker.read());
         assertNull(ret);
@@ -257,7 +260,7 @@ public class ConnectingClientConnectionTest {
 
         queue.sendRequest(mockRequest, mockCallback);
 
-        ticker.advance(AbstractClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS + 1);
+        ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS + 1);
 
         Optional<Long> ret = queue.checkTimeout(ticker.read());
         assertNull(ret);
@@ -267,7 +270,7 @@ public class ConnectingClientConnectionTest {
     public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
         queue.sendRequest(mockRequest, mockCallback);
 
-        ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS);
+        ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS);
 
         // Kaboom
         queue.runTimer((ClientActorBehavior) mockBehavior);
@@ -278,7 +281,7 @@ public class ConnectingClientConnectionTest {
     public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
         queue.sendRequest(mockRequest, mockCallback);
 
-        ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS + 1);
+        ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS + 1);
 
         // Kaboom
         queue.runTimer((ClientActorBehavior) mockBehavior);
@@ -287,7 +290,7 @@ public class ConnectingClientConnectionTest {
 
     @Test
     public void testRunTimeoutEmptyWithoutProgressExact() throws NoProgressException {
-        ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS);
+        ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS);
 
         // No problem
         Optional<Long> ret = queue.checkTimeout(ticker.read());
@@ -297,7 +300,7 @@ public class ConnectingClientConnectionTest {
 
     @Test
     public void testRunTimeoutEmptyWithoutProgressMore() throws NoProgressException {
-        ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS + 1);
+        ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS + 1);
 
         // No problem
         Optional<Long> ret = queue.checkTimeout(ticker.read());
@@ -346,7 +349,7 @@ public class ConnectingClientConnectionTest {
         queue.sendRequest(mockRequest2, mockCallback);
         queue.receiveResponse(mockResponseEnvelope);
 
-        ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS - 11);
+        ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS - 11);
 
         Optional<Long> ret = queue.checkTimeout(ticker.read());
         assertNull(ret);
index a281b12..0805f56 100644 (file)
@@ -34,7 +34,7 @@ public class ReconnectingClientConnectionTest
     public void testCheckTimeoutConnectionTimedout() throws Exception {
         final Consumer<Response<?, ?>> callback = mock(Consumer.class);
         connection.sendRequest(createRequest(replyToProbe.ref()), callback);
-        final long now = context.ticker().read() + ConnectedClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS;
+        final long now = context.ticker().read() + ConnectedClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
         final Optional<Long> timeout = connection.checkTimeout(now);
         Assert.assertNotNull(timeout);
         Assert.assertTrue(timeout.isPresent());
index 890e103..dee8142 100644 (file)
@@ -15,6 +15,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.text.WordUtils;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
 import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
 import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
@@ -88,6 +89,9 @@ public class DatastoreContext implements ClientActorConfig {
     private boolean transactionDebugContextEnabled = false;
     private String shardManagerPersistenceId;
     private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
+    private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
+    private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
+    private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
 
     public static Set<String> getGlobalDatastoreNames() {
         return GLOBAL_DATASTORE_NAMES;
@@ -125,6 +129,9 @@ public class DatastoreContext implements ClientActorConfig {
         this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
         this.shardManagerPersistenceId = other.shardManagerPersistenceId;
         this.useTellBasedProtocol = other.useTellBasedProtocol;
+        this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
+        this.requestTimeout = other.requestTimeout;
+        this.noProgressTimeout = other.noProgressTimeout;
 
         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
@@ -315,6 +322,21 @@ public class DatastoreContext implements ClientActorConfig {
         return maximumMessageSliceSize;
     }
 
+    @Override
+    public long getBackendAlivenessTimerInterval() {
+        return backendAlivenessTimerInterval;
+    }
+
+    @Override
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    @Override
+    public long getNoProgressTimeout() {
+        return noProgressTimeout;
+    }
+
     public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
         private final DatastoreContext datastoreContext;
         private int maxShardDataChangeExecutorPoolSize =
@@ -566,7 +588,7 @@ public class DatastoreContext implements ClientActorConfig {
             return this;
         }
 
-        public Builder fileBackedStreamingThresholdInMegabytes(final int  fileBackedStreamingThreshold) {
+        public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
             datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
             return this;
         }
@@ -576,6 +598,21 @@ public class DatastoreContext implements ClientActorConfig {
             return this;
         }
 
+        public Builder backendAlivenessTimerIntervalInSeconds(final long interval) {
+            datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval);
+            return this;
+        }
+
+        public Builder frontendRequestTimeoutInSeconds(final long timeout) {
+            datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout);
+            return this;
+        }
+
+        public Builder frontendNoProgressTimeoutInSeconds(final long timeout) {
+            datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout);
+            return this;
+        }
+
         @Override
         public DatastoreContext build() {
             datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
index f0938e8..18071e5 100644 (file)
@@ -99,6 +99,9 @@ public class DistributedConfigDataStoreProviderModule extends AbstractDistribute
                 .maximumMessageSliceSize(props.getMaximumMessageSliceSize().getValue().intValue())
                 .useTellBasedProtocol(props.getUseTellBasedProtocol())
                 .syncIndexThreshold(props.getSyncIndexThreshold().getValue())
+                .backendAlivenessTimerIntervalInSeconds(props.getBackendAlivenessTimerIntervalInSeconds().getValue())
+                .frontendRequestTimeoutInSeconds(props.getFrontendRequestTimeoutInSeconds().getValue())
+                .frontendNoProgressTimeoutInSeconds(props.getFrontendNoProgressTimeoutInSeconds().getValue())
                 .build();
     }
 
index a31d517..8a87466 100644 (file)
@@ -99,6 +99,9 @@ public class DistributedOperationalDataStoreProviderModule
                 .maximumMessageSliceSize(props.getMaximumMessageSliceSize().getValue().intValue())
                 .useTellBasedProtocol(props.getUseTellBasedProtocol())
                 .syncIndexThreshold(props.getSyncIndexThreshold().getValue())
+                .backendAlivenessTimerIntervalInSeconds(props.getBackendAlivenessTimerIntervalInSeconds().getValue())
+                .frontendRequestTimeoutInSeconds(props.getFrontendRequestTimeoutInSeconds().getValue())
+                .frontendNoProgressTimeoutInSeconds(props.getFrontendNoProgressTimeoutInSeconds().getValue())
                 .build();
     }
 
index 8b3d72d..ea5dabf 100644 (file)
@@ -264,6 +264,26 @@ module distributed-datastore-provider {
                          commitIndex trails the leader's journal by more than this amount of entries the follower
                          is considered to be out-of-sync.";
         }
+
+        leaf backend-aliveness-timer-interval-in-seconds {
+            default 30;
+            type non-zero-uint32-type;
+            description "The timer interval whereby, on expiration after response inactivity from the back-end,
+                        the connection to the back-end is torn down and reconnection is attempted.";
+        }
+
+        leaf frontend-request-timeout-in-seconds {
+            default 120; // 2 minutes
+            type non-zero-uint32-type;
+            description "The timeout interval whereby client frontend transaction requests are failed.";
+        }
+
+        leaf frontend-no-progress-timeout-in-seconds {
+            default 900; // 15 minutes
+            type non-zero-uint32-type;
+            description "The timeout interval whereby the client front-end hasn't made progress with the
+                         back-end on any request and terminates.";
+        }
     }
 
     // Augments the 'configuration' choice node under modules/module.

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.