Migrate netconf-topology to new transport
[netconf.git] / apps / netconf-topology / src / main / java / org / opendaylight / netconf / topology / spi / NetconfNodeHandler.java
index 47f980c66039ce8fa1c47e8e1077254aab9e1039..2e0de08a1c6e4f817522865ce51f2b56cf2d0194 100644 (file)
@@ -10,11 +10,12 @@ package org.opendaylight.netconf.topology.spi;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.annotations.VisibleForTesting;
-import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Future;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -22,7 +23,7 @@ import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.checkerframework.checker.lock.qual.Holding;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.dom.api.DOMNotification;
-import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.client.NetconfClientFactory;
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
 import org.opendaylight.netconf.client.mdsal.LibraryModulesSchemas;
 import org.opendaylight.netconf.client.mdsal.LibrarySchemaSourceProvider;
@@ -40,6 +41,7 @@ import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
 import org.opendaylight.netconf.client.mdsal.spi.KeepaliveSalFacade;
+import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.optional.rev221225.NetconfNodeAugmentedOptional;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
@@ -58,11 +60,11 @@ public final class NetconfNodeHandler extends AbstractRegistration implements Re
     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeHandler.class);
 
     private final @NonNull List<SchemaSourceRegistration<?>> yanglibRegistrations;
-    private final @NonNull NetconfClientDispatcher clientDispatcher;
+    private final @NonNull NetconfClientFactory clientFactory;
     private final @NonNull NetconfClientConfiguration clientConfig;
     private final @NonNull NetconfDeviceCommunicator communicator;
     private final @NonNull RemoteDeviceHandler delegate;
-    private final @NonNull EventExecutor eventExecutor;
+    private final @NonNull ListeningScheduledExecutorService scheduledExecutor;
     private final @NonNull RemoteDeviceId deviceId;
 
     private final long maxAttempts;
@@ -74,17 +76,18 @@ public final class NetconfNodeHandler extends AbstractRegistration implements Re
     @GuardedBy("this")
     private long lastSleep;
     @GuardedBy("this")
-    private Future<?> currentTask;
+    private ListenableFuture<?> currentTask;
 
-    public NetconfNodeHandler(final NetconfClientDispatcher clientDispatcher, final EventExecutor eventExecutor,
-            final ScheduledExecutorService keepaliveExecutor, final BaseNetconfSchemas baseSchemas,
+    public NetconfNodeHandler(final NetconfClientFactory clientFactory,
+            final ScheduledExecutorService scheduledExecutor, final BaseNetconfSchemas baseSchemas,
             final SchemaResourceManager schemaManager, final Executor processingExecutor,
             final NetconfClientConfigurationBuilderFactory builderFactory,
             final DeviceActionFactory deviceActionFactory, final RemoteDeviceHandler delegate,
             final RemoteDeviceId deviceId, final NodeId nodeId, final NetconfNode node,
             final NetconfNodeAugmentedOptional nodeOptional) {
-        this.clientDispatcher = requireNonNull(clientDispatcher);
-        this.eventExecutor = requireNonNull(eventExecutor);
+        this.clientFactory = requireNonNull(clientFactory);
+        // FIXME: do not wrap this executor
+        this.scheduledExecutor = MoreExecutors.listeningDecorator(scheduledExecutor);
         this.delegate = requireNonNull(delegate);
         this.deviceId = requireNonNull(deviceId);
 
@@ -104,7 +107,7 @@ public final class NetconfNodeHandler extends AbstractRegistration implements Re
         final long keepaliveDelay = node.requireKeepaliveDelay().toJava();
         if (keepaliveDelay > 0) {
             LOG.info("Adding keepalive facade, for device {}", nodeId);
-            salFacade = keepAliveFacade = new KeepaliveSalFacade(deviceId, this, keepaliveExecutor, keepaliveDelay,
+            salFacade = keepAliveFacade = new KeepaliveSalFacade(deviceId, this, scheduledExecutor, keepaliveDelay,
                 node.requireDefaultRequestTimeoutMillis().toJava());
         } else {
             salFacade = this;
@@ -154,11 +157,16 @@ public final class NetconfNodeHandler extends AbstractRegistration implements Re
 
     @Holding("this")
     private void lockedConnect() {
-        currentTask = clientDispatcher.createClient(clientConfig);
-        currentTask.addListener(this::connectComplete);
+        try {
+            final var clientFuture = clientFactory.createClient(clientConfig);
+            clientFuture.addListener(() -> connectComplete(clientFuture), MoreExecutors.directExecutor());
+            currentTask = clientFuture;
+        } catch (UnsupportedConfigurationException e) {
+            onDeviceFailed(e);
+        }
     }
 
-    private void connectComplete(final Future<?> future) {
+    private void connectComplete(final ListenableFuture<?> future) {
         // Locked manipulation of internal state
         synchronized (this) {
             // A quick sanity check
@@ -166,16 +174,18 @@ public final class NetconfNodeHandler extends AbstractRegistration implements Re
                 LOG.warn("Ignoring connection completion, expected {} actual {}", future, currentTask);
                 return;
             }
-
             currentTask = null;
-            final var cause = future.cause();
-            if (cause == null || cause instanceof CancellationException) {
-                // Success or cancellation, nothing else to do.
-                // In case of success the rest of the setup is driven by RemoteDeviceHandler callbacks
-                return;
+            // ListenableFuture provide no detail on error unless you attempt to get() the result
+            // then only the original exception is rethrown wrapped with ExecutionException
+            try {
+                if (future.isCancelled() || future.isDone() && future.get() != null) {
+                    // Success or cancellation, nothing else to do.
+                    // In case of success the rest of the setup is driven by RemoteDeviceHandler callbacks
+                    return;
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                LOG.debug("Connection attempt {} to {} failed", attempts, deviceId, e);
             }
-
-            LOG.debug("Connection attempt {} to {} failed", attempts, deviceId, cause);
         }
 
         // We are invoking callbacks, do not hold locks
@@ -257,9 +267,9 @@ public final class NetconfNodeHandler extends AbstractRegistration implements Re
 
         // Schedule a task for the right time. We always go through the executor to eliminate the special case of
         // immediate reconnect. While we could check and got to lockedConnect(), it makes for a rare special case.
-        // That special case makes for more code paths to test and introduces additional uncertainty as to whether
-        // the attempt was executed on on this thread or not.
-        currentTask = eventExecutor.schedule(this::reconnect, delayMillis, TimeUnit.MILLISECONDS);
+        // That special case makes for more code paths to test and introduces additional uncertainty whether
+        // the attempt was executed on this thread or not.
+        currentTask = scheduledExecutor.schedule(this::reconnect, delayMillis, TimeUnit.MILLISECONDS);
         return null;
     }