Use SettableFuture instead of Promise
[netconf.git] / protocol / netconf-client / src / main / java / org / opendaylight / netconf / client / NetconfClientFactoryImpl.java
index bf563493fb491c58a3e471efc72b3b93615ed060..06b95737b75b846b63ff6c931973f91c78574956 100644 (file)
@@ -13,12 +13,10 @@ import static org.opendaylight.netconf.client.conf.NetconfClientConfiguration.Ne
 import static org.opendaylight.netconf.client.conf.NetconfClientConfiguration.NetconfClientProtocol.TLS;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
-import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GlobalEventExecutor;
-import io.netty.util.concurrent.Promise;
 import javax.inject.Singleton;
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
 import org.opendaylight.netconf.transport.api.TransportChannel;
@@ -56,30 +54,30 @@ public class NetconfClientFactoryImpl implements NetconfClientFactory {
     }
 
     @Override
-    public Future<NetconfClientSession> createClient(final NetconfClientConfiguration configuration)
+    public ListenableFuture<NetconfClientSession> createClient(final NetconfClientConfiguration configuration)
             throws UnsupportedConfigurationException {
         final var protocol = configuration.getProtocol();
-        final var promise = new DefaultPromise<NetconfClientSession>(GlobalEventExecutor.INSTANCE);
+        final var future = SettableFuture.<NetconfClientSession>create();
         final var channelInitializer = new ClientChannelInitializer(createNegotiatorFactory(configuration),
             () -> configuration.getSessionListener());
         final var bootstrap = factory.newBootstrap();
 
         if (TCP.equals(protocol)) {
-            TCPClient.connect(new ClientTransportChannelListener(promise, channelInitializer), bootstrap,
+            TCPClient.connect(new ClientTransportChannelListener(future, channelInitializer), bootstrap,
                 configuration.getTcpParameters());
         } else if (TLS.equals(protocol)) {
             if (configuration.getTlsParameters() != null) {
-                TLSClient.connect(new ClientTransportChannelListener(promise, channelInitializer), bootstrap,
+                TLSClient.connect(new ClientTransportChannelListener(future, channelInitializer), bootstrap,
                     configuration.getTcpParameters(), configuration.getTlsParameters());
             } else {
-                TLSClient.connect(new ClientTransportChannelListener(promise, channelInitializer), bootstrap,
+                TLSClient.connect(new ClientTransportChannelListener(future, channelInitializer), bootstrap,
                     configuration.getTcpParameters(), configuration.getTransportSslHandlerFactory());
             }
         } else if (SSH.equals(protocol)) {
-            factory.connectClient("netconf", new ClientTransportChannelListener(promise, channelInitializer),
+            factory.connectClient("netconf", new ClientTransportChannelListener(future, channelInitializer),
                 configuration.getTcpParameters(), configuration.getSshParameters());
         }
-        return promise;
+        return future;
     }
 
     private NetconfClientSessionNegotiatorFactory createNegotiatorFactory(
@@ -96,21 +94,31 @@ public class NetconfClientFactoryImpl implements NetconfClientFactory {
     }
 
     private record ClientTransportChannelListener(
-            Promise<NetconfClientSession> promise,
+            SettableFuture<NetconfClientSession> future,
             ClientChannelInitializer initializer) implements TransportChannelListener {
         ClientTransportChannelListener {
-            requireNonNull(promise);
+            requireNonNull(future);
             requireNonNull(initializer);
         }
 
         @Override
         public void onTransportChannelEstablished(final TransportChannel channel) {
-            initializer.initialize(channel.channel(), promise);
+            final var nettyChannel = channel.channel();
+            final var promise = nettyChannel.eventLoop().<NetconfClientSession>newPromise();
+            initializer.initialize(nettyChannel, promise);
+            promise.addListener(ignored -> {
+                final var cause = promise.cause();
+                if (cause != null) {
+                    future.setException(cause);
+                } else {
+                    future.set(promise.getNow());
+                }
+            });
         }
 
         @Override
         public void onTransportChannelFailed(final Throwable cause) {
-            promise.setFailure(cause);
+            future.setException(cause);
         }
     }
 }