Use SettableFuture instead of Promise 68/108468/3
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 16 Oct 2023 18:34:33 +0000 (20:34 +0200)
committerRobert Varga <nite@hq.sk>
Mon, 16 Oct 2023 20:37:11 +0000 (20:37 +0000)
Perform correct bridging when we have a channel handy -- that allows us
to reuse the channel's thread.

JIRA: NETCONF-1108
Change-Id: Icdc1437f9ed9e0f560773185b2ba1f6124b0845b
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
protocol/netconf-client/src/main/java/org/opendaylight/netconf/client/NetconfClientFactory.java
protocol/netconf-client/src/main/java/org/opendaylight/netconf/client/NetconfClientFactoryImpl.java

index 5d19d580108761173ca718d4b1dcf62a2f693044..5a99f5a5428e62e6805a8d9f4301aac9060c7d83 100644 (file)
@@ -7,7 +7,7 @@
  */
 package org.opendaylight.netconf.client;
 
-import io.netty.util.concurrent.Future;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
 import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
 
@@ -15,16 +15,14 @@ import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
  * Basic interface for Netconf client factory.
  */
 public interface NetconfClientFactory extends AutoCloseable {
-
     /**
-     * Create netconf client. Network communication has to be set up based on network protocol specified in
+     * Create a NETCONF client. Network communication has to be set up based on network protocol specified in
      * clientConfiguration
      *
      * @param clientConfiguration configuration
-     * @return netconf client as {@link NetconfClientSession} future
+     * @return A future producing the {@link NetconfClientSession}
      * @throws UnsupportedConfigurationException if any transport configuration parameters is invalid
      */
-    Future<NetconfClientSession> createClient(NetconfClientConfiguration clientConfiguration)
+    ListenableFuture<NetconfClientSession> createClient(NetconfClientConfiguration clientConfiguration)
         throws UnsupportedConfigurationException;
-
 }
index bf563493fb491c58a3e471efc72b3b93615ed060..06b95737b75b846b63ff6c931973f91c78574956 100644 (file)
@@ -13,12 +13,10 @@ import static org.opendaylight.netconf.client.conf.NetconfClientConfiguration.Ne
 import static org.opendaylight.netconf.client.conf.NetconfClientConfiguration.NetconfClientProtocol.TLS;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
-import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GlobalEventExecutor;
-import io.netty.util.concurrent.Promise;
 import javax.inject.Singleton;
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
 import org.opendaylight.netconf.transport.api.TransportChannel;
@@ -56,30 +54,30 @@ public class NetconfClientFactoryImpl implements NetconfClientFactory {
     }
 
     @Override
-    public Future<NetconfClientSession> createClient(final NetconfClientConfiguration configuration)
+    public ListenableFuture<NetconfClientSession> createClient(final NetconfClientConfiguration configuration)
             throws UnsupportedConfigurationException {
         final var protocol = configuration.getProtocol();
-        final var promise = new DefaultPromise<NetconfClientSession>(GlobalEventExecutor.INSTANCE);
+        final var future = SettableFuture.<NetconfClientSession>create();
         final var channelInitializer = new ClientChannelInitializer(createNegotiatorFactory(configuration),
             () -> configuration.getSessionListener());
         final var bootstrap = factory.newBootstrap();
 
         if (TCP.equals(protocol)) {
-            TCPClient.connect(new ClientTransportChannelListener(promise, channelInitializer), bootstrap,
+            TCPClient.connect(new ClientTransportChannelListener(future, channelInitializer), bootstrap,
                 configuration.getTcpParameters());
         } else if (TLS.equals(protocol)) {
             if (configuration.getTlsParameters() != null) {
-                TLSClient.connect(new ClientTransportChannelListener(promise, channelInitializer), bootstrap,
+                TLSClient.connect(new ClientTransportChannelListener(future, channelInitializer), bootstrap,
                     configuration.getTcpParameters(), configuration.getTlsParameters());
             } else {
-                TLSClient.connect(new ClientTransportChannelListener(promise, channelInitializer), bootstrap,
+                TLSClient.connect(new ClientTransportChannelListener(future, channelInitializer), bootstrap,
                     configuration.getTcpParameters(), configuration.getTransportSslHandlerFactory());
             }
         } else if (SSH.equals(protocol)) {
-            factory.connectClient("netconf", new ClientTransportChannelListener(promise, channelInitializer),
+            factory.connectClient("netconf", new ClientTransportChannelListener(future, channelInitializer),
                 configuration.getTcpParameters(), configuration.getSshParameters());
         }
-        return promise;
+        return future;
     }
 
     private NetconfClientSessionNegotiatorFactory createNegotiatorFactory(
@@ -96,21 +94,31 @@ public class NetconfClientFactoryImpl implements NetconfClientFactory {
     }
 
     private record ClientTransportChannelListener(
-            Promise<NetconfClientSession> promise,
+            SettableFuture<NetconfClientSession> future,
             ClientChannelInitializer initializer) implements TransportChannelListener {
         ClientTransportChannelListener {
-            requireNonNull(promise);
+            requireNonNull(future);
             requireNonNull(initializer);
         }
 
         @Override
         public void onTransportChannelEstablished(final TransportChannel channel) {
-            initializer.initialize(channel.channel(), promise);
+            final var nettyChannel = channel.channel();
+            final var promise = nettyChannel.eventLoop().<NetconfClientSession>newPromise();
+            initializer.initialize(nettyChannel, promise);
+            promise.addListener(ignored -> {
+                final var cause = promise.cause();
+                if (cause != null) {
+                    future.setException(cause);
+                } else {
+                    future.set(promise.getNow());
+                }
+            });
         }
 
         @Override
         public void onTransportChannelFailed(final Throwable cause) {
-            promise.setFailure(cause);
+            future.setException(cause);
         }
     }
 }