Introduce NetconfTimer
[netconf.git] / plugins / netconf-client-mdsal / src / main / java / org / opendaylight / netconf / client / mdsal / spi / KeepaliveSalFacade.java
index a169dec871b1f75e684145ed543a0392abf2e6d1..d9551a7b026bed17628ada2fe858f838bc54f942 100644 (file)
@@ -11,7 +11,6 @@ import static com.google.common.base.Preconditions.checkState;
 import static java.util.Objects.requireNonNull;
 import static org.opendaylight.netconf.client.mdsal.impl.NetconfBaseOps.getSourceNode;
 import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_NODEID;
-import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_GET_CONFIG_QNAME;
 import static org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil.NETCONF_RUNNING_NODEID;
 
 import com.google.common.util.concurrent.FutureCallback;
@@ -19,7 +18,9 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.ScheduledExecutorService;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.TimeUnit;
 import javax.xml.transform.dom.DOMSource;
 import org.checkerframework.checker.lock.qual.GuardedBy;
@@ -35,6 +36,8 @@ import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
 import org.opendaylight.netconf.client.mdsal.impl.NetconfMessageTransformUtil;
+import org.opendaylight.netconf.common.NetconfTimer;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.base._1._0.rev110601.GetConfig;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
@@ -55,32 +58,29 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
     // 1 minute transaction timeout by default
     private static final long DEFAULT_TRANSACTION_TIMEOUT_MILLI = TimeUnit.MILLISECONDS.toMillis(60000);
 
-    private final RemoteDeviceHandler salFacade;
-    private final ScheduledExecutorService executor;
-
+    private final RemoteDeviceHandler deviceHandler;
+    private final RemoteDeviceId deviceId;
+    private final NetconfTimer timer;
     private final long keepaliveDelaySeconds;
     private final long timeoutNanos;
     private final long delayNanos;
 
-    private final RemoteDeviceId id;
-
     private volatile NetconfDeviceCommunicator listener;
     private volatile KeepaliveTask task;
 
-    public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
-            final ScheduledExecutorService executor, final long keepaliveDelaySeconds,
-            final long requestTimeoutMillis) {
-        this.id = id;
-        this.salFacade = salFacade;
-        this.executor = requireNonNull(executor);
+    public KeepaliveSalFacade(final RemoteDeviceId deviceId, final RemoteDeviceHandler deviceHandler,
+            final NetconfTimer timer, final long keepaliveDelaySeconds, final long requestTimeoutMillis) {
+        this.deviceId = requireNonNull(deviceId);
+        this.deviceHandler = requireNonNull(deviceHandler);
+        this.timer = requireNonNull(timer);
         this.keepaliveDelaySeconds = keepaliveDelaySeconds;
         delayNanos = TimeUnit.SECONDS.toNanos(keepaliveDelaySeconds);
         timeoutNanos = TimeUnit.MILLISECONDS.toNanos(requestTimeoutMillis);
     }
 
-    public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
-            final ScheduledExecutorService executor) {
-        this(id, salFacade, executor, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
+    public KeepaliveSalFacade(final RemoteDeviceId deviceId, final RemoteDeviceHandler deviceHandler,
+            final NetconfTimer timer) {
+        this(deviceId, deviceHandler, timer, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
     }
 
     /**
@@ -117,10 +117,10 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
         }
     }
 
-    void reconnect() {
-        checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id);
+    private void disconnect() {
+        checkState(listener != null, "%s: Unable to reconnect, session listener is missing", deviceId);
         stopKeepalives();
-        LOG.info("{}: Reconnecting inactive netconf session", id);
+        LOG.info("{}: Reconnecting inactive netconf session", deviceId);
         listener.disconnect();
     }
 
@@ -139,15 +139,15 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
             throw new IllegalStateException("Unhandled " + devRpc);
         }
 
-        salFacade.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs,
+        deviceHandler.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs,
             // FIXME: wrap with keepalive
             services.actions()));
 
         // We have performed a callback, which might have termined keepalives
         final var localTask = task;
         if (localTask != null) {
-            LOG.debug("{}: Netconf session initiated, starting keepalives", id);
-            LOG.trace("{}: Scheduling keepalives every {}s", id, keepaliveDelaySeconds);
+            LOG.debug("{}: Netconf session initiated, starting keepalives", deviceId);
+            LOG.trace("{}: Scheduling keepalives every {}s", deviceId, keepaliveDelaySeconds);
             localTask.enableKeepalive();
         }
     }
@@ -155,13 +155,13 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
     @Override
     public void onDeviceDisconnected() {
         stopKeepalives();
-        salFacade.onDeviceDisconnected();
+        deviceHandler.onDeviceDisconnected();
     }
 
     @Override
     public void onDeviceFailed(final Throwable throwable) {
         stopKeepalives();
-        salFacade.onDeviceFailed(throwable);
+        deviceHandler.onDeviceFailed(throwable);
     }
 
     @Override
@@ -170,29 +170,33 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
         if (localTask != null) {
             localTask.recordActivity();
         }
-        salFacade.onNotification(domNotification);
+        deviceHandler.onNotification(domNotification);
     }
 
     @Override
     public void close() {
         stopKeepalives();
-        salFacade.close();
+        deviceHandler.close();
     }
 
     private <T> @NonNull ListenableFuture<T> scheduleTimeout(final ListenableFuture<T> invokeFuture) {
         final var timeout = new RequestTimeoutTask<>(invokeFuture);
-        final var timeoutFuture = executor.schedule(timeout, timeoutNanos, TimeUnit.NANOSECONDS);
-        invokeFuture.addListener(() -> timeoutFuture.cancel(false), MoreExecutors.directExecutor());
+        scheduleTimeout(invokeFuture, timeout);
         return timeout.userFuture;
     }
 
+    private void scheduleTimeout(final ListenableFuture<?> future, final TimeoutTask timeoutTask) {
+        final var timeout = timer.newTimeout(timeoutTask, timeoutNanos, TimeUnit.NANOSECONDS);
+        future.addListener(() -> timeout.cancel(), MoreExecutors.directExecutor());
+    }
+
     /**
      * Invoke keepalive RPC and check the response. In case of any received response the keepalive
      * is considered successful and schedules next keepalive with a fixed delay. If the response is unsuccessful (no
      * response received, or the rcp could not even be sent) immediate reconnect is triggered as netconf session
      * is considered inactive/failed.
      */
-    private final class KeepaliveTask implements Runnable, FutureCallback<DOMRpcResult> {
+    private final class KeepaliveTask implements TimerTask, FutureCallback<DOMRpcResult> {
         // Keepalive RPC static resources
         static final @NonNull ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(
             NETCONF_GET_CONFIG_NODEID, getSourceNode(NETCONF_RUNNING_NODEID), NetconfMessageTransformUtil.EMPTY_FILTER);
@@ -209,7 +213,7 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
         }
 
         @Override
-        public void run() {
+        public void run(final Timeout timeout) {
             final long local = lastActivity;
             final long now = System.nanoTime();
             final long inFutureNanos = local + delayNanos - now;
@@ -231,26 +235,28 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
 
         synchronized void enableKeepalive() {
             recordActivity();
-            if (!suppressed) {
-                // unscheduled -> unsuppressed
-                reschedule();
-            } else {
+            if (suppressed) {
                 // suppressed -> unsuppressed
                 suppressed = false;
+            } else {
+                // unscheduled -> unsuppressed
+                reschedule();
             }
         }
 
         private synchronized void sendKeepalive(final long now) {
             if (suppressed) {
-                LOG.debug("{}: Skipping keepalive while disabled", id);
+                LOG.debug("{}: Skipping keepalive while disabled", deviceId);
                 // suppressed -> unscheduled
                 suppressed = false;
                 return;
             }
 
-            LOG.trace("{}: Invoking keepalive RPC", id);
-            final var deviceFuture = devRpc.invokeNetconf(NETCONF_GET_CONFIG_QNAME, KEEPALIVE_PAYLOAD);
+            LOG.trace("{}: Invoking keepalive RPC", deviceId);
+            final var deviceFuture = devRpc.invokeNetconf(GetConfig.QNAME, KEEPALIVE_PAYLOAD);
             lastActivity = now;
+
+            scheduleTimeout(deviceFuture, new TimeoutTask(deviceFuture));
             Futures.addCallback(deviceFuture, this, MoreExecutors.directExecutor());
         }
 
@@ -259,8 +265,8 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
             // No matter what response we got, rpc-reply or rpc-error,
             // we got it from device so the netconf session is OK
             if (result == null) {
-                LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
-                reconnect();
+                LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", deviceId);
+                disconnect();
                 return;
             }
 
@@ -269,19 +275,23 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
             } else {
                 final var errors = result.errors();
                 if (!errors.isEmpty()) {
-                    LOG.warn("{}: Keepalive RPC failed with error: {}", id, errors);
+                    LOG.warn("{}: Keepalive RPC failed with error: {}", deviceId, errors);
                     reschedule();
                 } else {
-                    LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
-                    reconnect();
+                    LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", deviceId);
+                    disconnect();
                 }
             }
         }
 
         @Override
         public void onFailure(final Throwable throwable) {
-            LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", id, throwable);
-            reconnect();
+            if (throwable instanceof CancellationException) {
+                LOG.warn("{}: Keepalive RPC timed out. Reconnecting netconf session.", deviceId);
+            } else {
+                LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", deviceId, throwable);
+            }
+            disconnect();
         }
 
         private void reschedule() {
@@ -289,7 +299,20 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
         }
 
         private void reschedule(final long delay) {
-            executor.schedule(this, delay, TimeUnit.NANOSECONDS);
+            timer.newTimeout(this, delay, TimeUnit.NANOSECONDS);
+        }
+    }
+
+    private static class TimeoutTask implements TimerTask {
+        private final ListenableFuture<?> future;
+
+        TimeoutTask(final ListenableFuture<?> future) {
+            this.future = requireNonNull(future);
+        }
+
+        @Override
+        public final void run(final Timeout timeout) {
+            future.cancel(true);
         }
     }
 
@@ -297,22 +320,15 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
      * Request timeout task is called once the requestTimeoutMillis is reached. At that moment, if the request is not
      * yet finished, we cancel it.
      */
-    private final class RequestTimeoutTask<V> implements FutureCallback<V>, Runnable {
+    private final class RequestTimeoutTask<V> extends TimeoutTask implements FutureCallback<V> {
         private final @NonNull SettableFuture<V> userFuture = SettableFuture.create();
-        private final @NonNull ListenableFuture<? extends V> rpcResultFuture;
 
         RequestTimeoutTask(final ListenableFuture<V> rpcResultFuture) {
-            this.rpcResultFuture = requireNonNull(rpcResultFuture);
+            super(rpcResultFuture);
+            // Note: this will also wire run() to onFailure()
             Futures.addCallback(rpcResultFuture, this, MoreExecutors.directExecutor());
         }
 
-        @Override
-        public void run() {
-            rpcResultFuture.cancel(true);
-            userFuture.cancel(false);
-            enableKeepalive();
-        }
-
         @Override
         public void onSuccess(final V result) {
             // No matter what response we got,
@@ -323,12 +339,15 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
 
         @Override
         public void onFailure(final Throwable throwable) {
-            // User/Application RPC failed (The RPC did not reach the remote device or ...)
-            // FIXME: what other reasons could cause this ?)
-            LOG.warn("{}: Rpc failure detected. Reconnecting netconf session", id, throwable);
+            // User/Application RPC failed (The RPC did not reach the remote device or it has timeed out)
+            if (throwable instanceof CancellationException) {
+                LOG.warn("{}: RPC timed out. Reconnecting netconf session", deviceId);
+            } else {
+                LOG.warn("{}: RPC failed. Reconnecting netconf session", deviceId, throwable);
+            }
             userFuture.setException(throwable);
             // There is no point in keeping this session. Reconnect.
-            reconnect();
+            disconnect();
         }
     }