*/
package org.opendaylight.netconf.client;
-import io.netty.util.concurrent.Future;
+import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
* Basic interface for Netconf client factory.
*/
public interface NetconfClientFactory extends AutoCloseable {
-
/**
- * Create netconf client. Network communication has to be set up based on network protocol specified in
+ * Create a NETCONF client. Network communication has to be set up based on network protocol specified in
* clientConfiguration
*
* @param clientConfiguration configuration
- * @return netconf client as {@link NetconfClientSession} future
+ * @return A future producing the {@link NetconfClientSession}
* @throws UnsupportedConfigurationException if any transport configuration parameters is invalid
*/
- Future<NetconfClientSession> createClient(NetconfClientConfiguration clientConfiguration)
+ ListenableFuture<NetconfClientSession> createClient(NetconfClientConfiguration clientConfiguration)
throws UnsupportedConfigurationException;
-
}
import static org.opendaylight.netconf.client.conf.NetconfClientConfiguration.NetconfClientProtocol.TLS;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
-import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GlobalEventExecutor;
-import io.netty.util.concurrent.Promise;
import javax.inject.Singleton;
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.netconf.transport.api.TransportChannel;
}
@Override
- public Future<NetconfClientSession> createClient(final NetconfClientConfiguration configuration)
+ public ListenableFuture<NetconfClientSession> createClient(final NetconfClientConfiguration configuration)
throws UnsupportedConfigurationException {
final var protocol = configuration.getProtocol();
- final var promise = new DefaultPromise<NetconfClientSession>(GlobalEventExecutor.INSTANCE);
+ final var future = SettableFuture.<NetconfClientSession>create();
final var channelInitializer = new ClientChannelInitializer(createNegotiatorFactory(configuration),
() -> configuration.getSessionListener());
final var bootstrap = factory.newBootstrap();
if (TCP.equals(protocol)) {
- TCPClient.connect(new ClientTransportChannelListener(promise, channelInitializer), bootstrap,
+ TCPClient.connect(new ClientTransportChannelListener(future, channelInitializer), bootstrap,
configuration.getTcpParameters());
} else if (TLS.equals(protocol)) {
if (configuration.getTlsParameters() != null) {
- TLSClient.connect(new ClientTransportChannelListener(promise, channelInitializer), bootstrap,
+ TLSClient.connect(new ClientTransportChannelListener(future, channelInitializer), bootstrap,
configuration.getTcpParameters(), configuration.getTlsParameters());
} else {
- TLSClient.connect(new ClientTransportChannelListener(promise, channelInitializer), bootstrap,
+ TLSClient.connect(new ClientTransportChannelListener(future, channelInitializer), bootstrap,
configuration.getTcpParameters(), configuration.getTransportSslHandlerFactory());
}
} else if (SSH.equals(protocol)) {
- factory.connectClient("netconf", new ClientTransportChannelListener(promise, channelInitializer),
+ factory.connectClient("netconf", new ClientTransportChannelListener(future, channelInitializer),
configuration.getTcpParameters(), configuration.getSshParameters());
}
- return promise;
+ return future;
}
private NetconfClientSessionNegotiatorFactory createNegotiatorFactory(
}
private record ClientTransportChannelListener(
- Promise<NetconfClientSession> promise,
+ SettableFuture<NetconfClientSession> future,
ClientChannelInitializer initializer) implements TransportChannelListener {
ClientTransportChannelListener {
- requireNonNull(promise);
+ requireNonNull(future);
requireNonNull(initializer);
}
@Override
public void onTransportChannelEstablished(final TransportChannel channel) {
- initializer.initialize(channel.channel(), promise);
+ final var nettyChannel = channel.channel();
+ final var promise = nettyChannel.eventLoop().<NetconfClientSession>newPromise();
+ initializer.initialize(nettyChannel, promise);
+ promise.addListener(ignored -> {
+ final var cause = promise.cause();
+ if (cause != null) {
+ future.setException(cause);
+ } else {
+ future.set(promise.getNow());
+ }
+ });
}
@Override
public void onTransportChannelFailed(final Throwable cause) {
- promise.setFailure(cause);
+ future.setException(cause);
}
}
}