So we can tweak them in production and unit tests.
Change-Id: I39ce8cdf3cd5397a71f52c42357943dfe5eccb7c
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
*/
/**
* Backend aliveness timer. This is reset whenever we receive a response from the backend and kept armed whenever
- * we have an outstanding request. If when this time expires, we tear down this connection and attept to reconnect
+ * we have an outstanding request. If when this time expires, we tear down this connection and attempt to reconnect
* it.
*/
- @VisibleForTesting
- static final long BACKEND_ALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
+ public static final long DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
/**
* Request timeout. If the request fails to complete within this time since it was originally enqueued, we time
* the request out.
*/
- @VisibleForTesting
- static final long REQUEST_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(2);
+ public static final long DEFAULT_REQUEST_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(2);
/**
* No progress timeout. A client fails to make any forward progress in this time, it will terminate itself.
*/
- @VisibleForTesting
- static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
+ public static final long DEFAULT_NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
// Emit a debug entry if we sleep for more that this amount
private static final long DEBUG_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
if (queue.isEmpty()) {
// The queue is becoming non-empty, schedule a timer.
- scheduleTimer(entry.getEnqueuedTicks() + REQUEST_TIMEOUT_NANOS - now);
+ scheduleTimer(entry.getEnqueuedTicks() + context.config().getRequestTimeout() - now);
}
return queue.enqueue(entry, now);
} finally {
// If the delay is negative, we need to schedule an action immediately. While the caller could have checked
// for that condition and take appropriate action, but this is more convenient and less error-prone.
- final long normalized = delay <= 0 ? 0 : Math.min(delay, BACKEND_ALIVE_TIMEOUT_NANOS);
+ final long normalized = delay <= 0 ? 0 : Math.min(delay, context.config().getBackendAlivenessTimerInterval());
final FiniteDuration dur = FiniteDuration.fromNanos(normalized);
LOG.debug("{}: connection {} scheduling timeout in {}", context.persistenceId(), this, dur);
/**
* Check this queue for timeout and initiate reconnection if that happened. If the queue has not made progress
- * in {@link #NO_PROGRESS_TIMEOUT_NANOS} nanoseconds, it will be aborted.
+ * in {@link #DEFAULT_NO_PROGRESS_TIMEOUT_NANOS} nanoseconds, it will be aborted.
*
* @param current Current behavior
* @return Next behavior to use
// The following line is only reliable when queue is not forwarding, but such state should not last long.
// FIXME: BUG-8422: this may not be accurate w.r.t. replayed entries
final long ticksSinceProgress = queue.ticksStalling(now);
- if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
+ if (ticksSinceProgress >= context.config().getNoProgressTimeout()) {
LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
}
final long backendSilentTicks = backendSilentTicks(now);
- if (backendSilentTicks >= BACKEND_ALIVE_TIMEOUT_NANOS) {
+ if (backendSilentTicks >= context.config().getBackendAlivenessTimerInterval()) {
LOG.debug("{}: Connection {} has not seen activity from backend for {} nanoseconds, timing out",
context.persistenceId(), this, backendSilentTicks);
return null;
int tasksTimedOut = 0;
for (ConnectionEntry head = queue.peek(); head != null; head = queue.peek()) {
final long beenOpen = now - head.getEnqueuedTicks();
- if (beenOpen < REQUEST_TIMEOUT_NANOS) {
- return Optional.of(REQUEST_TIMEOUT_NANOS - beenOpen);
+ final long requestTimeout = context.config().getRequestTimeout();
+ if (beenOpen < requestTimeout) {
+ return Optional.of(requestTimeout - beenOpen);
}
tasksTimedOut++;
* @return the directory name
*/
String getTempFileDirectory();
+
+ /**
+ * Returns the timer interval whereby, on expiration after response inactivity from the back-end, the connection to
+ * the back-end is torn down and reconnection is attempted.
+
+ * @return timer interval in nanoseconds.
+ */
+ long getBackendAlivenessTimerInterval();
+
+ /**
+ * Returns the timeout interval whereby requests are failed.
+ *
+ * @return the timeout interval in nanoseconds.
+ */
+ long getRequestTimeout();
+
+ /**
+ * Returns the timeout interval whereby the client front-end hasn't made progress with the back-end on any request
+ * and terminates.
+ *
+ * @return the timeout interval in nanoseconds.
+ */
+ long getNoProgressTimeout();
}
ClientActorConfig mockConfig = mock(ClientActorConfig.class);
doReturn(2_000_000).when(mockConfig).getMaximumMessageSliceSize();
doReturn(1_000_000_000).when(mockConfig).getFileBackedStreamingThreshold();
+ doReturn(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS)
+ .when(mockConfig).getBackendAlivenessTimerInterval();
+ doReturn(AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS).when(mockConfig).getRequestTimeout();
+ doReturn(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS)
+ .when(mockConfig).getNoProgressTimeout();
return mockConfig;
}
private static class MockedActor extends AbstractClientActor {
private final ClientActorBehavior<?> initialBehavior;
+ private final ClientActorConfig mockConfig = AccessClientUtil.newMockClientActorConfig();
private static Props props(final FrontendIdentifier frontendId, final ClientActorBehavior<?> initialBehavior) {
return Props.create(MockedActor.class, () -> new MockedActor(frontendId, initialBehavior));
@Override
protected ClientActorConfig getClientActorConfig() {
- return new ClientActorConfig() {
- @Override
- public String getTempFileDirectory() {
- return null;
- }
-
- @Override
- public int getMaximumMessageSliceSize() {
- return 2000000;
- }
-
- @Override
- public int getFileBackedStreamingThreshold() {
- return 1000000000;
- }
- };
+ return mockConfig;
}
}
public void testCheckTimeoutConnectionTimedout() throws Exception {
final Consumer<Response<?, ?>> callback = mock(Consumer.class);
connection.sendRequest(createRequest(replyToProbe.ref()), callback);
- final long now = context.ticker().read() + ConnectedClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS;
+ final long now = context.ticker().read() + ConnectedClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
final Optional<Long> timeout = connection.checkTimeout(now);
Assert.assertNull(timeout);
}
ticker.advance(ThreadLocalRandom.current().nextLong());
doReturn(ticker).when(mockContext).ticker();
+ final ClientActorConfig mockConfig = AccessClientUtil.newMockClientActorConfig();
+ doReturn(mockConfig).when(mockContext).config();
+
mockActor = TestProbe.apply(actorSystem);
mockBackendInfo = new BackendInfo(mockActor.ref(), 0, ABIVersion.current(), 5);
mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
queue.sendRequest(mockRequest, mockCallback);
- ticker.advance(AbstractClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS - 1);
+ ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS - 1);
Optional<Long> ret = queue.checkTimeout(ticker.read());
assertNotNull(ret);
queue.sendRequest(mockRequest, mockCallback);
- ticker.advance(AbstractClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS);
+ ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS);
Optional<Long> ret = queue.checkTimeout(ticker.read());
assertNull(ret);
queue.sendRequest(mockRequest, mockCallback);
- ticker.advance(AbstractClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS + 1);
+ ticker.advance(AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS + 1);
Optional<Long> ret = queue.checkTimeout(ticker.read());
assertNull(ret);
public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
queue.sendRequest(mockRequest, mockCallback);
- ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS);
+ ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS);
// Kaboom
queue.runTimer((ClientActorBehavior) mockBehavior);
public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
queue.sendRequest(mockRequest, mockCallback);
- ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS + 1);
+ ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS + 1);
// Kaboom
queue.runTimer((ClientActorBehavior) mockBehavior);
@Test
public void testRunTimeoutEmptyWithoutProgressExact() throws NoProgressException {
- ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS);
+ ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS);
// No problem
Optional<Long> ret = queue.checkTimeout(ticker.read());
@Test
public void testRunTimeoutEmptyWithoutProgressMore() throws NoProgressException {
- ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS + 1);
+ ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS + 1);
// No problem
Optional<Long> ret = queue.checkTimeout(ticker.read());
queue.sendRequest(mockRequest2, mockCallback);
queue.receiveResponse(mockResponseEnvelope);
- ticker.advance(AbstractClientConnection.NO_PROGRESS_TIMEOUT_NANOS - 11);
+ ticker.advance(AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS - 11);
Optional<Long> ret = queue.checkTimeout(ticker.read());
assertNull(ret);
public void testCheckTimeoutConnectionTimedout() throws Exception {
final Consumer<Response<?, ?>> callback = mock(Consumer.class);
connection.sendRequest(createRequest(replyToProbe.ref()), callback);
- final long now = context.ticker().read() + ConnectedClientConnection.BACKEND_ALIVE_TIMEOUT_NANOS;
+ final long now = context.ticker().read() + ConnectedClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
final Optional<Long> timeout = connection.checkTimeout(now);
Assert.assertNotNull(timeout);
Assert.assertTrue(timeout.isPresent());
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.text.WordUtils;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
import org.opendaylight.controller.cluster.access.client.ClientActorConfig;
import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
import org.opendaylight.controller.cluster.common.actor.FileAkkaConfigurationReader;
private boolean transactionDebugContextEnabled = false;
private String shardManagerPersistenceId;
private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
+ private long backendAlivenessTimerInterval = AbstractClientConnection.DEFAULT_BACKEND_ALIVE_TIMEOUT_NANOS;
+ private long requestTimeout = AbstractClientConnection.DEFAULT_REQUEST_TIMEOUT_NANOS;
+ private long noProgressTimeout = AbstractClientConnection.DEFAULT_NO_PROGRESS_TIMEOUT_NANOS;
public static Set<String> getGlobalDatastoreNames() {
return GLOBAL_DATASTORE_NAMES;
this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
this.shardManagerPersistenceId = other.shardManagerPersistenceId;
this.useTellBasedProtocol = other.useTellBasedProtocol;
+ this.backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
+ this.requestTimeout = other.requestTimeout;
+ this.noProgressTimeout = other.noProgressTimeout;
setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
return maximumMessageSliceSize;
}
+ @Override
+ public long getBackendAlivenessTimerInterval() {
+ return backendAlivenessTimerInterval;
+ }
+
+ @Override
+ public long getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ @Override
+ public long getNoProgressTimeout() {
+ return noProgressTimeout;
+ }
+
public static class Builder implements org.opendaylight.yangtools.concepts.Builder<DatastoreContext> {
private final DatastoreContext datastoreContext;
private int maxShardDataChangeExecutorPoolSize =
return this;
}
- public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
+ public Builder fileBackedStreamingThresholdInMegabytes(final int fileBackedStreamingThreshold) {
datastoreContext.setFileBackedStreamingThreshold(fileBackedStreamingThreshold * ConfigParams.MEGABYTE);
return this;
}
return this;
}
+ public Builder backendAlivenessTimerIntervalInSeconds(final long interval) {
+ datastoreContext.backendAlivenessTimerInterval = TimeUnit.SECONDS.toNanos(interval);
+ return this;
+ }
+
+ public Builder frontendRequestTimeoutInSeconds(final long timeout) {
+ datastoreContext.requestTimeout = TimeUnit.SECONDS.toNanos(timeout);
+ return this;
+ }
+
+ public Builder frontendNoProgressTimeoutInSeconds(final long timeout) {
+ datastoreContext.noProgressTimeout = TimeUnit.SECONDS.toNanos(timeout);
+ return this;
+ }
+
@Override
public DatastoreContext build() {
datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create(
.maximumMessageSliceSize(props.getMaximumMessageSliceSize().getValue().intValue())
.useTellBasedProtocol(props.getUseTellBasedProtocol())
.syncIndexThreshold(props.getSyncIndexThreshold().getValue())
+ .backendAlivenessTimerIntervalInSeconds(props.getBackendAlivenessTimerIntervalInSeconds().getValue())
+ .frontendRequestTimeoutInSeconds(props.getFrontendRequestTimeoutInSeconds().getValue())
+ .frontendNoProgressTimeoutInSeconds(props.getFrontendNoProgressTimeoutInSeconds().getValue())
.build();
}
.maximumMessageSliceSize(props.getMaximumMessageSliceSize().getValue().intValue())
.useTellBasedProtocol(props.getUseTellBasedProtocol())
.syncIndexThreshold(props.getSyncIndexThreshold().getValue())
+ .backendAlivenessTimerIntervalInSeconds(props.getBackendAlivenessTimerIntervalInSeconds().getValue())
+ .frontendRequestTimeoutInSeconds(props.getFrontendRequestTimeoutInSeconds().getValue())
+ .frontendNoProgressTimeoutInSeconds(props.getFrontendNoProgressTimeoutInSeconds().getValue())
.build();
}
commitIndex trails the leader's journal by more than this amount of entries the follower
is considered to be out-of-sync.";
}
+
+ leaf backend-aliveness-timer-interval-in-seconds {
+ default 30;
+ type non-zero-uint32-type;
+ description "The timer interval whereby, on expiration after response inactivity from the back-end,
+ the connection to the back-end is torn down and reconnection is attempted.";
+ }
+
+ leaf frontend-request-timeout-in-seconds {
+ default 120; // 2 minutes
+ type non-zero-uint32-type;
+ description "The timeout interval whereby client frontend transaction requests are failed.";
+ }
+
+ leaf frontend-no-progress-timeout-in-seconds {
+ default 900; // 15 minutes
+ type non-zero-uint32-type;
+ description "The timeout interval whereby the client front-end hasn't made progress with the
+ back-end on any request and terminates.";
+ }
}
// Augments the 'configuration' choice node under modules/module.