import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
import java.util.concurrent.CancellationException;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.xml.transform.dom.DOMSource;
import org.checkerframework.checker.lock.qual.GuardedBy;
// 1 minute transaction timeout by default
private static final long DEFAULT_TRANSACTION_TIMEOUT_MILLI = TimeUnit.MILLISECONDS.toMillis(60000);
- private final RemoteDeviceHandler salFacade;
- private final ScheduledExecutorService executor;
-
+ private final RemoteDeviceHandler deviceHandler;
+ private final RemoteDeviceId deviceId;
+ private final Timer timer;
private final long keepaliveDelaySeconds;
private final long timeoutNanos;
private final long delayNanos;
- private final RemoteDeviceId id;
-
private volatile NetconfDeviceCommunicator listener;
private volatile KeepaliveTask task;
- public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
- final ScheduledExecutorService executor, final long keepaliveDelaySeconds,
- final long requestTimeoutMillis) {
- this.id = id;
- this.salFacade = salFacade;
- this.executor = requireNonNull(executor);
+ public KeepaliveSalFacade(final RemoteDeviceId deviceId, final RemoteDeviceHandler deviceHandler, final Timer timer,
+ final long keepaliveDelaySeconds, final long requestTimeoutMillis) {
+ this.deviceId = requireNonNull(deviceId);
+ this.deviceHandler = requireNonNull(deviceHandler);
+ this.timer = requireNonNull(timer);
this.keepaliveDelaySeconds = keepaliveDelaySeconds;
delayNanos = TimeUnit.SECONDS.toNanos(keepaliveDelaySeconds);
timeoutNanos = TimeUnit.MILLISECONDS.toNanos(requestTimeoutMillis);
}
- public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
- final ScheduledExecutorService executor) {
- this(id, salFacade, executor, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
+ public KeepaliveSalFacade(final RemoteDeviceId deviceId, final RemoteDeviceHandler deviceHandler,
+ final Timer timer) {
+ this(deviceId, deviceHandler, timer, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
}
/**
}
}
- void reconnect() {
- checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id);
+ private void disconnect() {
+ checkState(listener != null, "%s: Unable to reconnect, session listener is missing", deviceId);
stopKeepalives();
- LOG.info("{}: Reconnecting inactive netconf session", id);
+ LOG.info("{}: Reconnecting inactive netconf session", deviceId);
listener.disconnect();
}
throw new IllegalStateException("Unhandled " + devRpc);
}
- salFacade.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs,
+ deviceHandler.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs,
// FIXME: wrap with keepalive
services.actions()));
// We have performed a callback, which might have termined keepalives
final var localTask = task;
if (localTask != null) {
- LOG.debug("{}: Netconf session initiated, starting keepalives", id);
- LOG.trace("{}: Scheduling keepalives every {}s", id, keepaliveDelaySeconds);
+ LOG.debug("{}: Netconf session initiated, starting keepalives", deviceId);
+ LOG.trace("{}: Scheduling keepalives every {}s", deviceId, keepaliveDelaySeconds);
localTask.enableKeepalive();
}
}
@Override
public void onDeviceDisconnected() {
stopKeepalives();
- salFacade.onDeviceDisconnected();
+ deviceHandler.onDeviceDisconnected();
}
@Override
public void onDeviceFailed(final Throwable throwable) {
stopKeepalives();
- salFacade.onDeviceFailed(throwable);
+ deviceHandler.onDeviceFailed(throwable);
}
@Override
if (localTask != null) {
localTask.recordActivity();
}
- salFacade.onNotification(domNotification);
+ deviceHandler.onNotification(domNotification);
}
@Override
public void close() {
stopKeepalives();
- salFacade.close();
+ deviceHandler.close();
}
private <T> @NonNull ListenableFuture<T> scheduleTimeout(final ListenableFuture<T> invokeFuture) {
return timeout.userFuture;
}
- private void scheduleTimeout(final ListenableFuture<?> future, final TimeoutTask timeout) {
- final var timeoutFuture = executor.schedule(timeout, timeoutNanos, TimeUnit.NANOSECONDS);
- future.addListener(() -> timeoutFuture.cancel(false), MoreExecutors.directExecutor());
+ private void scheduleTimeout(final ListenableFuture<?> future, final TimeoutTask timeoutTask) {
+ final var timeout = timer.newTimeout(timeoutTask, timeoutNanos, TimeUnit.NANOSECONDS);
+ future.addListener(() -> timeout.cancel(), MoreExecutors.directExecutor());
}
/**
* response received, or the rcp could not even be sent) immediate reconnect is triggered as netconf session
* is considered inactive/failed.
*/
- private final class KeepaliveTask implements Runnable, FutureCallback<DOMRpcResult> {
+ private final class KeepaliveTask implements TimerTask, FutureCallback<DOMRpcResult> {
// Keepalive RPC static resources
static final @NonNull ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(
NETCONF_GET_CONFIG_NODEID, getSourceNode(NETCONF_RUNNING_NODEID), NetconfMessageTransformUtil.EMPTY_FILTER);
}
@Override
- public void run() {
+ public void run(final Timeout timeout) {
final long local = lastActivity;
final long now = System.nanoTime();
final long inFutureNanos = local + delayNanos - now;
private synchronized void sendKeepalive(final long now) {
if (suppressed) {
- LOG.debug("{}: Skipping keepalive while disabled", id);
+ LOG.debug("{}: Skipping keepalive while disabled", deviceId);
// suppressed -> unscheduled
suppressed = false;
return;
}
- LOG.trace("{}: Invoking keepalive RPC", id);
+ LOG.trace("{}: Invoking keepalive RPC", deviceId);
final var deviceFuture = devRpc.invokeNetconf(NETCONF_GET_CONFIG_QNAME, KEEPALIVE_PAYLOAD);
lastActivity = now;
// No matter what response we got, rpc-reply or rpc-error,
// we got it from device so the netconf session is OK
if (result == null) {
- LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
- reconnect();
+ LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", deviceId);
+ disconnect();
return;
}
} else {
final var errors = result.errors();
if (!errors.isEmpty()) {
- LOG.warn("{}: Keepalive RPC failed with error: {}", id, errors);
+ LOG.warn("{}: Keepalive RPC failed with error: {}", deviceId, errors);
reschedule();
} else {
- LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
- reconnect();
+ LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", deviceId);
+ disconnect();
}
}
}
@Override
public void onFailure(final Throwable throwable) {
if (throwable instanceof CancellationException) {
- LOG.warn("{}: Keepalive RPC timed out. Reconnecting netconf session.", id);
+ LOG.warn("{}: Keepalive RPC timed out. Reconnecting netconf session.", deviceId);
} else {
- LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", id, throwable);
+ LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", deviceId, throwable);
}
- reconnect();
+ disconnect();
}
private void reschedule() {
}
private void reschedule(final long delay) {
- executor.schedule(this, delay, TimeUnit.NANOSECONDS);
+ timer.newTimeout(this, delay, TimeUnit.NANOSECONDS);
}
}
- private static class TimeoutTask implements Runnable {
+ private static class TimeoutTask implements TimerTask {
private final ListenableFuture<?> future;
TimeoutTask(final ListenableFuture<?> future) {
}
@Override
- public final void run() {
+ public final void run(final Timeout timeout) {
future.cancel(true);
}
}
public void onFailure(final Throwable throwable) {
// User/Application RPC failed (The RPC did not reach the remote device or it has timeed out)
if (throwable instanceof CancellationException) {
- LOG.warn("{}: RPC timed out. Reconnecting netconf session", id);
+ LOG.warn("{}: RPC timed out. Reconnecting netconf session", deviceId);
} else {
- LOG.warn("{}: RPC failed. Reconnecting netconf session", id, throwable);
+ LOG.warn("{}: RPC failed. Reconnecting netconf session", deviceId, throwable);
}
userFuture.setException(throwable);
// There is no point in keeping this session. Reconnect.
- reconnect();
+ disconnect();
}
}