import static java.util.Objects.requireNonNull;
import static org.opendaylight.netconf.client.mdsal.impl.NetconfBaseOps.getSourceNode;
import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID;
-import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME;
import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_RUNNING_NODEID;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.ScheduledExecutorService;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import javax.xml.transform.dom.DOMSource;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
import org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil;
+import org.opendaylight.netconf.common.NetconfTimer;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.GetConfig;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
// 1 minute transaction timeout by default
private static final long DEFAULT_TRANSACTION_TIMEOUT_MILLI = TimeUnit.MILLISECONDS.toMillis(60000);
- private final RemoteDeviceHandler salFacade;
- private final ScheduledExecutorService executor;
-
+ private final RemoteDeviceHandler deviceHandler;
+ private final RemoteDeviceId deviceId;
+ private final NetconfTimer timer;
private final long keepaliveDelaySeconds;
private final long timeoutNanos;
private final long delayNanos;
- private final RemoteDeviceId id;
-
private volatile NetconfDeviceCommunicator listener;
private volatile KeepaliveTask task;
- public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
- final ScheduledExecutorService executor, final long keepaliveDelaySeconds,
- final long requestTimeoutMillis) {
- this.id = id;
- this.salFacade = salFacade;
- this.executor = requireNonNull(executor);
+ public KeepaliveSalFacade(final RemoteDeviceId deviceId, final RemoteDeviceHandler deviceHandler,
+ final NetconfTimer timer, final long keepaliveDelaySeconds, final long requestTimeoutMillis) {
+ this.deviceId = requireNonNull(deviceId);
+ this.deviceHandler = requireNonNull(deviceHandler);
+ this.timer = requireNonNull(timer);
this.keepaliveDelaySeconds = keepaliveDelaySeconds;
delayNanos = TimeUnit.SECONDS.toNanos(keepaliveDelaySeconds);
timeoutNanos = TimeUnit.MILLISECONDS.toNanos(requestTimeoutMillis);
}
- public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
- final ScheduledExecutorService executor) {
- this(id, salFacade, executor, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
+ public KeepaliveSalFacade(final RemoteDeviceId deviceId, final RemoteDeviceHandler deviceHandler,
+ final NetconfTimer timer) {
+ this(deviceId, deviceHandler, timer, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
}
/**
}
}
- void reconnect() {
- checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id);
+ private void disconnect() {
+ checkState(listener != null, "%s: Unable to reconnect, session listener is missing", deviceId);
stopKeepalives();
- LOG.info("{}: Reconnecting inactive netconf session", id);
+ LOG.info("{}: Reconnecting inactive netconf session", deviceId);
listener.disconnect();
}
throw new IllegalStateException("Unhandled " + devRpc);
}
- salFacade.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs,
+ deviceHandler.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs,
// FIXME: wrap with keepalive
services.actions()));
// We have performed a callback, which might have termined keepalives
final var localTask = task;
if (localTask != null) {
- LOG.debug("{}: Netconf session initiated, starting keepalives", id);
- LOG.trace("{}: Scheduling keepalives every {}s", id, keepaliveDelaySeconds);
+ LOG.debug("{}: Netconf session initiated, starting keepalives", deviceId);
+ LOG.trace("{}: Scheduling keepalives every {}s", deviceId, keepaliveDelaySeconds);
localTask.enableKeepalive();
}
}
@Override
public void onDeviceDisconnected() {
stopKeepalives();
- salFacade.onDeviceDisconnected();
+ deviceHandler.onDeviceDisconnected();
}
@Override
public void onDeviceFailed(final Throwable throwable) {
stopKeepalives();
- salFacade.onDeviceFailed(throwable);
+ deviceHandler.onDeviceFailed(throwable);
}
@Override
if (localTask != null) {
localTask.recordActivity();
}
- salFacade.onNotification(domNotification);
+ deviceHandler.onNotification(domNotification);
}
@Override
public void close() {
stopKeepalives();
- salFacade.close();
+ deviceHandler.close();
}
private <T> @NonNull ListenableFuture<T> scheduleTimeout(final ListenableFuture<T> invokeFuture) {
final var timeout = new RequestTimeoutTask<>(invokeFuture);
- final var timeoutFuture = executor.schedule(timeout, timeoutNanos, TimeUnit.NANOSECONDS);
- invokeFuture.addListener(() -> timeoutFuture.cancel(false), MoreExecutors.directExecutor());
+ scheduleTimeout(invokeFuture, timeout);
return timeout.userFuture;
}
+ private void scheduleTimeout(final ListenableFuture<?> future, final TimeoutTask timeoutTask) {
+ final var timeout = timer.newTimeout(timeoutTask, timeoutNanos, TimeUnit.NANOSECONDS);
+ future.addListener(() -> timeout.cancel(), MoreExecutors.directExecutor());
+ }
+
/**
* Invoke keepalive RPC and check the response. In case of any received response the keepalive
* is considered successful and schedules next keepalive with a fixed delay. If the response is unsuccessful (no
* response received, or the rcp could not even be sent) immediate reconnect is triggered as netconf session
* is considered inactive/failed.
*/
- private final class KeepaliveTask implements Runnable, FutureCallback<DOMRpcResult> {
+ private final class KeepaliveTask implements TimerTask, FutureCallback<DOMRpcResult> {
// Keepalive RPC static resources
static final @NonNull ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(
NETCONF_GET_CONFIG_NODEID, getSourceNode(NETCONF_RUNNING_NODEID), NetconfMessageTransformUtil.EMPTY_FILTER);
}
@Override
- public void run() {
+ public void run(final Timeout timeout) {
final long local = lastActivity;
final long now = System.nanoTime();
final long inFutureNanos = local + delayNanos - now;
synchronized void enableKeepalive() {
recordActivity();
- if (!suppressed) {
- // unscheduled -> unsuppressed
- reschedule();
- } else {
+ if (suppressed) {
// suppressed -> unsuppressed
suppressed = false;
+ } else {
+ // unscheduled -> unsuppressed
+ reschedule();
}
}
private synchronized void sendKeepalive(final long now) {
if (suppressed) {
- LOG.debug("{}: Skipping keepalive while disabled", id);
+ LOG.debug("{}: Skipping keepalive while disabled", deviceId);
// suppressed -> unscheduled
suppressed = false;
return;
}
- LOG.trace("{}: Invoking keepalive RPC", id);
- final var deviceFuture = devRpc.invokeNetconf(NETCONF_GET_CONFIG_QNAME, KEEPALIVE_PAYLOAD);
+ LOG.trace("{}: Invoking keepalive RPC", deviceId);
+ final var deviceFuture = devRpc.invokeNetconf(GetConfig.QNAME, KEEPALIVE_PAYLOAD);
lastActivity = now;
+
+ scheduleTimeout(deviceFuture, new TimeoutTask(deviceFuture));
Futures.addCallback(deviceFuture, this, MoreExecutors.directExecutor());
}
// No matter what response we got, rpc-reply or rpc-error,
// we got it from device so the netconf session is OK
if (result == null) {
- LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
- reconnect();
+ LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", deviceId);
+ disconnect();
return;
}
} else {
final var errors = result.errors();
if (!errors.isEmpty()) {
- LOG.warn("{}: Keepalive RPC failed with error: {}", id, errors);
+ LOG.warn("{}: Keepalive RPC failed with error: {}", deviceId, errors);
reschedule();
} else {
- LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
- reconnect();
+ LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", deviceId);
+ disconnect();
}
}
}
@Override
public void onFailure(final Throwable throwable) {
- LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", id, throwable);
- reconnect();
+ if (throwable instanceof CancellationException) {
+ LOG.warn("{}: Keepalive RPC timed out. Reconnecting netconf session.", deviceId);
+ } else {
+ LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", deviceId, throwable);
+ }
+ disconnect();
}
private void reschedule() {
}
private void reschedule(final long delay) {
- executor.schedule(this, delay, TimeUnit.NANOSECONDS);
+ timer.newTimeout(this, delay, TimeUnit.NANOSECONDS);
+ }
+ }
+
+ private static class TimeoutTask implements TimerTask {
+ private final ListenableFuture<?> future;
+
+ TimeoutTask(final ListenableFuture<?> future) {
+ this.future = requireNonNull(future);
+ }
+
+ @Override
+ public final void run(final Timeout timeout) {
+ future.cancel(true);
}
}
* Request timeout task is called once the requestTimeoutMillis is reached. At that moment, if the request is not
* yet finished, we cancel it.
*/
- private final class RequestTimeoutTask<V> implements FutureCallback<V>, Runnable {
+ private final class RequestTimeoutTask<V> extends TimeoutTask implements FutureCallback<V> {
private final @NonNull SettableFuture<V> userFuture = SettableFuture.create();
- private final @NonNull ListenableFuture<? extends V> rpcResultFuture;
RequestTimeoutTask(final ListenableFuture<V> rpcResultFuture) {
- this.rpcResultFuture = requireNonNull(rpcResultFuture);
+ super(rpcResultFuture);
+ // Note: this will also wire run() to onFailure()
Futures.addCallback(rpcResultFuture, this, MoreExecutors.directExecutor());
}
- @Override
- public void run() {
- rpcResultFuture.cancel(true);
- userFuture.cancel(false);
- enableKeepalive();
- }
-
@Override
public void onSuccess(final V result) {
// No matter what response we got,
@Override
public void onFailure(final Throwable throwable) {
- // User/Application RPC failed (The RPC did not reach the remote device or ...)
- // FIXME: what other reasons could cause this ?)
- LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, throwable);
+ // User/Application RPC failed (The RPC did not reach the remote device or it has timeed out)
+ if (throwable instanceof CancellationException) {
+ LOG.warn("{}: RPC timed out. Reconnecting netconf session", deviceId);
+ } else {
+ LOG.warn("{}: RPC failed. Reconnecting netconf session", deviceId, throwable);
+ }
userFuture.setException(throwable);
// There is no point in keeping this session. Reconnect.
- reconnect();
+ disconnect();
}
}