// 1 minute transaction timeout by default
private static final long DEFAULT_TRANSACTION_TIMEOUT_MILLI = TimeUnit.MILLISECONDS.toMillis(60000);
- private final KeepaliveTask keepaliveTask = new KeepaliveTask();
private final RemoteDeviceHandler salFacade;
private final ScheduledExecutorService executor;
private final RemoteDeviceId id;
private volatile NetconfDeviceCommunicator listener;
- private volatile RemoteDeviceServices currentServices;
+ private volatile KeepaliveTask task;
public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
final ScheduledExecutorService executor, final long keepaliveDelaySeconds,
}
/**
- * Cancel current keepalive and also reset current deviceRpc.
+ * Cancel current keepalive and free it.
*/
private synchronized void stopKeepalives() {
- keepaliveTask.disableKeepalive();
- currentServices = null;
+ final var localTask = task;
+ if (localTask != null) {
+ localTask.disableKeepalive();
+ task = null;
+ }
+ }
+
+ private void disableKeepalive() {
+ final var localTask = task;
+ if (localTask != null) {
+ localTask.disableKeepalive();
+ }
+ }
+
+ private void enableKeepalive() {
+ final var localTask = task;
+ if (localTask != null) {
+ localTask.enableKeepalive();
+ }
}
void reconnect() {
@Override
public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
final NetconfSessionPreferences sessionPreferences, final RemoteDeviceServices services) {
- currentServices = requireNonNull(services);
+ final var devRpc = services.rpcs();
+ task = new KeepaliveTask(devRpc);
final var devAction = services.actions();
// FIXME: wrap with keepalive
final var kaAction = devAction;
salFacade.onDeviceConnected(deviceSchema, sessionPreferences,
- new RemoteDeviceServices(new KeepaliveDOMRpcService(services.rpcs()), kaAction));
-
- LOG.debug("{}: Netconf session initiated, starting keepalives", id);
- LOG.trace("{}: Scheduling keepalives every {}s", id, keepaliveDelaySeconds);
- keepaliveTask.enableKeepalive();
+ new RemoteDeviceServices(new KeepaliveDOMRpcService(devRpc), kaAction));
+
+ // We have performed a callback, which might have termined keepalives
+ final var localTask = task;
+ if (localTask != null) {
+ LOG.debug("{}: Netconf session initiated, starting keepalives", id);
+ LOG.trace("{}: Scheduling keepalives every {}s", id, keepaliveDelaySeconds);
+ localTask.enableKeepalive();
+ }
}
@Override
@Override
public void onNotification(final DOMNotification domNotification) {
- keepaliveTask.recordActivity();
+ final var localTask = task;
+ if (localTask != null) {
+ localTask.recordActivity();
+ }
salFacade.onNotification(domNotification);
}
* is considered inactive/failed.
*/
private final class KeepaliveTask implements Runnable, FutureCallback<DOMRpcResult> {
- private volatile long lastActivity;
+ private final DOMRpcService devRpc;
+
@GuardedBy("this")
- private boolean suppressed;
+ private boolean suppressed = false;
+
+ private volatile long lastActivity;
- KeepaliveTask() {
- suppressed = false;
+ KeepaliveTask(final DOMRpcService devRpc) {
+ this.devRpc = requireNonNull(devRpc);
}
@Override
private synchronized void sendKeepalive(final long now) {
if (suppressed) {
+ LOG.debug("{}: Skipping keepalive while disabled", id);
// suppressed -> unscheduled
suppressed = false;
return;
}
- final var localServices = currentServices;
- if (localServices == null) {
- // deviceRpc is null, which means we hit the reconnect window and attempted to send keepalive while
- // we were reconnecting. Next keepalive will be scheduled after reconnect so no action necessary here.
- LOG.debug("{}: Skipping keepalive while reconnecting", id);
- return;
- }
-
LOG.trace("{}: Invoking keepalive RPC", id);
- final var deviceFuture = localServices.rpcs().invokeRpc(NETCONF_GET_CONFIG_QNAME, KEEPALIVE_PAYLOAD);
+ final var deviceFuture = devRpc.invokeRpc(NETCONF_GET_CONFIG_QNAME, KEEPALIVE_PAYLOAD);
lastActivity = now;
Futures.addCallback(deviceFuture, this, MoreExecutors.directExecutor());
public void run() {
deviceFuture.cancel(true);
userFuture.cancel(false);
- keepaliveTask.enableKeepalive();
+ enableKeepalive();
}
@Override
// No matter what response we got,
// rpc-reply or rpc-error, we got it from device so the netconf session is OK.
userFuture.set(result);
- keepaliveTask.enableKeepalive();
+ enableKeepalive();
}
@Override
@Override
public ListenableFuture<? extends DOMRpcResult> invokeRpc(final QName type, final NormalizedNode input) {
- keepaliveTask.disableKeepalive();
+ disableKeepalive();
final ListenableFuture<? extends DOMRpcResult> deviceFuture = deviceRpc.invokeRpc(type, input);
final RequestTimeoutTask timeout = new RequestTimeoutTask(deviceFuture);