import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.xml.transform.dom.DOMSource;
private <T> @NonNull ListenableFuture<T> scheduleTimeout(final ListenableFuture<T> invokeFuture) {
final var timeout = new RequestTimeoutTask<>(invokeFuture);
- final var timeoutFuture = executor.schedule(timeout, timeoutNanos, TimeUnit.NANOSECONDS);
- invokeFuture.addListener(() -> timeoutFuture.cancel(false), MoreExecutors.directExecutor());
+ scheduleTimeout(invokeFuture, timeout);
return timeout.userFuture;
}
+ private void scheduleTimeout(final ListenableFuture<?> future, final TimeoutTask timeout) {
+ final var timeoutFuture = executor.schedule(timeout, timeoutNanos, TimeUnit.NANOSECONDS);
+ future.addListener(() -> timeoutFuture.cancel(false), MoreExecutors.directExecutor());
+ }
+
/**
* Invoke keepalive RPC and check the response. In case of any received response the keepalive
* is considered successful and schedules next keepalive with a fixed delay. If the response is unsuccessful (no
LOG.trace("{}: Invoking keepalive RPC", id);
final var deviceFuture = devRpc.invokeNetconf(NETCONF_GET_CONFIG_QNAME, KEEPALIVE_PAYLOAD);
lastActivity = now;
+
+ scheduleTimeout(deviceFuture, new TimeoutTask(deviceFuture));
Futures.addCallback(deviceFuture, this, MoreExecutors.directExecutor());
}
@Override
public void onFailure(final Throwable throwable) {
- LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", id, throwable);
+ if (throwable instanceof CancellationException) {
+ LOG.warn("{}: Keepalive RPC timed out. Reconnecting netconf session.", id);
+ } else {
+ LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", id, throwable);
+ }
reconnect();
}
}
}
+ private static class TimeoutTask implements Runnable {
+ private final ListenableFuture<?> future;
+
+ TimeoutTask(final ListenableFuture<?> future) {
+ this.future = requireNonNull(future);
+ }
+
+ @Override
+ public final void run() {
+ future.cancel(true);
+ }
+ }
+
/*
* Request timeout task is called once the requestTimeoutMillis is reached. At that moment, if the request is not
* yet finished, we cancel it.
*/
- private final class RequestTimeoutTask<V> implements FutureCallback<V>, Runnable {
+ private final class RequestTimeoutTask<V> extends TimeoutTask implements FutureCallback<V> {
private final @NonNull SettableFuture<V> userFuture = SettableFuture.create();
- private final @NonNull ListenableFuture<? extends V> rpcResultFuture;
RequestTimeoutTask(final ListenableFuture<V> rpcResultFuture) {
- this.rpcResultFuture = requireNonNull(rpcResultFuture);
+ super(rpcResultFuture);
+ // Note: this will also wire run() to onFailure()
Futures.addCallback(rpcResultFuture, this, MoreExecutors.directExecutor());
}
- @Override
- public void run() {
- // Note: this will loop to onFailure()
- rpcResultFuture.cancel(true);
- }
-
@Override
public void onSuccess(final V result) {
// No matter what response we got,
@Override
public void onFailure(final Throwable throwable) {
- // User/Application RPC failed (The RPC did not reach the remote device or ...)
- // FIXME: what other reasons could cause this ?)
- LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, throwable);
+ // User/Application RPC failed (The RPC did not reach the remote device or it has timeed out)
+ if (throwable instanceof CancellationException) {
+ LOG.warn("{}: RPC timed out. Reconnecting netconf session", id);
+ } else {
+ LOG.warn("{}: RPC failed. Reconnecting netconf session", id, throwable);
+ }
userFuture.setException(throwable);
// There is no point in keeping this session. Reconnect.
reconnect();
import static org.mockito.Mockito.verify;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
verify(listener, times(1)).disconnect();
}
+
+ @Test
+ public void testKeepaliveRpcResponseTimeout() {
+ final var neverResolvedFuture = SettableFuture.create();
+ doReturn(neverResolvedFuture).when(deviceRpc).invokeNetconf(any(), any());
+
+ keepaliveSalFacade.onDeviceConnected(null, null, new RemoteDeviceServices(deviceRpc, null));
+
+ verify(underlyingSalFacade).onDeviceConnected(isNull(), isNull(), any(RemoteDeviceServices.class));
+
+ // Should disconnect the session because RPC result future is never resolved and keepalive delay is 1 sec
+ verify(listener, timeout(115000).times(1)).disconnect();
+ verify(deviceRpc, times(1)).invokeNetconf(any(), any());
+ }
}