Use Timer for KeepaliveSalFacade
[netconf.git] / plugins / netconf-client-mdsal / src / main / java / org / opendaylight / netconf / client / mdsal / spi / KeepaliveSalFacade.java
index a974e057732a892e7edf42584fdc7617e962d5b5..0328ef9f5aa4a9db592e21ce80947f188bdf9056 100644 (file)
@@ -19,8 +19,10 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
 import java.util.concurrent.CancellationException;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.xml.transform.dom.DOMSource;
 import org.checkerframework.checker.lock.qual.GuardedBy;
@@ -56,32 +58,29 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
     // 1 minute transaction timeout by default
     private static final long DEFAULT_TRANSACTION_TIMEOUT_MILLI = TimeUnit.MILLISECONDS.toMillis(60000);
 
-    private final RemoteDeviceHandler salFacade;
-    private final ScheduledExecutorService executor;
-
+    private final RemoteDeviceHandler deviceHandler;
+    private final RemoteDeviceId deviceId;
+    private final Timer timer;
     private final long keepaliveDelaySeconds;
     private final long timeoutNanos;
     private final long delayNanos;
 
-    private final RemoteDeviceId id;
-
     private volatile NetconfDeviceCommunicator listener;
     private volatile KeepaliveTask task;
 
-    public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
-            final ScheduledExecutorService executor, final long keepaliveDelaySeconds,
-            final long requestTimeoutMillis) {
-        this.id = id;
-        this.salFacade = salFacade;
-        this.executor = requireNonNull(executor);
+    public KeepaliveSalFacade(final RemoteDeviceId deviceId, final RemoteDeviceHandler deviceHandler, final Timer timer,
+            final long keepaliveDelaySeconds, final long requestTimeoutMillis) {
+        this.deviceId = requireNonNull(deviceId);
+        this.deviceHandler = requireNonNull(deviceHandler);
+        this.timer = requireNonNull(timer);
         this.keepaliveDelaySeconds = keepaliveDelaySeconds;
         delayNanos = TimeUnit.SECONDS.toNanos(keepaliveDelaySeconds);
         timeoutNanos = TimeUnit.MILLISECONDS.toNanos(requestTimeoutMillis);
     }
 
-    public KeepaliveSalFacade(final RemoteDeviceId id, final RemoteDeviceHandler salFacade,
-            final ScheduledExecutorService executor) {
-        this(id, salFacade, executor, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
+    public KeepaliveSalFacade(final RemoteDeviceId deviceId, final RemoteDeviceHandler deviceHandler,
+            final Timer timer) {
+        this(deviceId, deviceHandler, timer, DEFAULT_DELAY, DEFAULT_TRANSACTION_TIMEOUT_MILLI);
     }
 
     /**
@@ -118,10 +117,10 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
         }
     }
 
-    void reconnect() {
-        checkState(listener != null, "%s: Unable to reconnect, session listener is missing", id);
+    private void disconnect() {
+        checkState(listener != null, "%s: Unable to reconnect, session listener is missing", deviceId);
         stopKeepalives();
-        LOG.info("{}: Reconnecting inactive netconf session", id);
+        LOG.info("{}: Reconnecting inactive netconf session", deviceId);
         listener.disconnect();
     }
 
@@ -140,15 +139,15 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
             throw new IllegalStateException("Unhandled " + devRpc);
         }
 
-        salFacade.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs,
+        deviceHandler.onDeviceConnected(deviceSchema, sessionPreferences, new RemoteDeviceServices(keepaliveRpcs,
             // FIXME: wrap with keepalive
             services.actions()));
 
         // We have performed a callback, which might have termined keepalives
         final var localTask = task;
         if (localTask != null) {
-            LOG.debug("{}: Netconf session initiated, starting keepalives", id);
-            LOG.trace("{}: Scheduling keepalives every {}s", id, keepaliveDelaySeconds);
+            LOG.debug("{}: Netconf session initiated, starting keepalives", deviceId);
+            LOG.trace("{}: Scheduling keepalives every {}s", deviceId, keepaliveDelaySeconds);
             localTask.enableKeepalive();
         }
     }
@@ -156,13 +155,13 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
     @Override
     public void onDeviceDisconnected() {
         stopKeepalives();
-        salFacade.onDeviceDisconnected();
+        deviceHandler.onDeviceDisconnected();
     }
 
     @Override
     public void onDeviceFailed(final Throwable throwable) {
         stopKeepalives();
-        salFacade.onDeviceFailed(throwable);
+        deviceHandler.onDeviceFailed(throwable);
     }
 
     @Override
@@ -171,13 +170,13 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
         if (localTask != null) {
             localTask.recordActivity();
         }
-        salFacade.onNotification(domNotification);
+        deviceHandler.onNotification(domNotification);
     }
 
     @Override
     public void close() {
         stopKeepalives();
-        salFacade.close();
+        deviceHandler.close();
     }
 
     private <T> @NonNull ListenableFuture<T> scheduleTimeout(final ListenableFuture<T> invokeFuture) {
@@ -186,9 +185,9 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
         return timeout.userFuture;
     }
 
-    private void scheduleTimeout(final ListenableFuture<?> future, final TimeoutTask timeout) {
-        final var timeoutFuture = executor.schedule(timeout, timeoutNanos, TimeUnit.NANOSECONDS);
-        future.addListener(() -> timeoutFuture.cancel(false), MoreExecutors.directExecutor());
+    private void scheduleTimeout(final ListenableFuture<?> future, final TimeoutTask timeoutTask) {
+        final var timeout = timer.newTimeout(timeoutTask, timeoutNanos, TimeUnit.NANOSECONDS);
+        future.addListener(() -> timeout.cancel(), MoreExecutors.directExecutor());
     }
 
     /**
@@ -197,7 +196,7 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
      * response received, or the rcp could not even be sent) immediate reconnect is triggered as netconf session
      * is considered inactive/failed.
      */
-    private final class KeepaliveTask implements Runnable, FutureCallback<DOMRpcResult> {
+    private final class KeepaliveTask implements TimerTask, FutureCallback<DOMRpcResult> {
         // Keepalive RPC static resources
         static final @NonNull ContainerNode KEEPALIVE_PAYLOAD = NetconfMessageTransformUtil.wrap(
             NETCONF_GET_CONFIG_NODEID, getSourceNode(NETCONF_RUNNING_NODEID), NetconfMessageTransformUtil.EMPTY_FILTER);
@@ -214,7 +213,7 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
         }
 
         @Override
-        public void run() {
+        public void run(final Timeout timeout) {
             final long local = lastActivity;
             final long now = System.nanoTime();
             final long inFutureNanos = local + delayNanos - now;
@@ -247,13 +246,13 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
 
         private synchronized void sendKeepalive(final long now) {
             if (suppressed) {
-                LOG.debug("{}: Skipping keepalive while disabled", id);
+                LOG.debug("{}: Skipping keepalive while disabled", deviceId);
                 // suppressed -> unscheduled
                 suppressed = false;
                 return;
             }
 
-            LOG.trace("{}: Invoking keepalive RPC", id);
+            LOG.trace("{}: Invoking keepalive RPC", deviceId);
             final var deviceFuture = devRpc.invokeNetconf(NETCONF_GET_CONFIG_QNAME, KEEPALIVE_PAYLOAD);
             lastActivity = now;
 
@@ -266,8 +265,8 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
             // No matter what response we got, rpc-reply or rpc-error,
             // we got it from device so the netconf session is OK
             if (result == null) {
-                LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
-                reconnect();
+                LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", deviceId);
+                disconnect();
                 return;
             }
 
@@ -276,11 +275,11 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
             } else {
                 final var errors = result.errors();
                 if (!errors.isEmpty()) {
-                    LOG.warn("{}: Keepalive RPC failed with error: {}", id, errors);
+                    LOG.warn("{}: Keepalive RPC failed with error: {}", deviceId, errors);
                     reschedule();
                 } else {
-                    LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", id);
-                    reconnect();
+                    LOG.warn("{} Keepalive RPC returned null with response. Reconnecting netconf session", deviceId);
+                    disconnect();
                 }
             }
         }
@@ -288,11 +287,11 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
         @Override
         public void onFailure(final Throwable throwable) {
             if (throwable instanceof CancellationException) {
-                LOG.warn("{}: Keepalive RPC timed out. Reconnecting netconf session.", id);
+                LOG.warn("{}: Keepalive RPC timed out. Reconnecting netconf session.", deviceId);
             } else {
-                LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", id, throwable);
+                LOG.warn("{}: Keepalive RPC failed. Reconnecting netconf session.", deviceId, throwable);
             }
-            reconnect();
+            disconnect();
         }
 
         private void reschedule() {
@@ -300,11 +299,11 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
         }
 
         private void reschedule(final long delay) {
-            executor.schedule(this, delay, TimeUnit.NANOSECONDS);
+            timer.newTimeout(this, delay, TimeUnit.NANOSECONDS);
         }
     }
 
-    private static class TimeoutTask implements Runnable {
+    private static class TimeoutTask implements TimerTask {
         private final ListenableFuture<?> future;
 
         TimeoutTask(final ListenableFuture<?> future) {
@@ -312,7 +311,7 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
         }
 
         @Override
-        public final void run() {
+        public final void run(final Timeout timeout) {
             future.cancel(true);
         }
     }
@@ -342,13 +341,13 @@ public final class KeepaliveSalFacade implements RemoteDeviceHandler {
         public void onFailure(final Throwable throwable) {
             // User/Application RPC failed (The RPC did not reach the remote device or it has timeed out)
             if (throwable instanceof CancellationException) {
-                LOG.warn("{}: RPC timed out. Reconnecting netconf session", id);
+                LOG.warn("{}: RPC timed out. Reconnecting netconf session", deviceId);
             } else {
-                LOG.warn("{}: RPC failed. Reconnecting netconf session", id, throwable);
+                LOG.warn("{}: RPC failed. Reconnecting netconf session", deviceId, throwable);
             }
             userFuture.setException(throwable);
             // There is no point in keeping this session. Reconnect.
-            reconnect();
+            disconnect();
         }
     }