Migrate netconf-topology to new transport
[netconf.git] / netconf / callhome-protocol / src / main / java / org / opendaylight / netconf / callhome / protocol / CallHomeSessionContext.java
index 7f7fe81e5eab6458bfa9ce1a844a5f980825ff6b..b2e41e7eb0fc5f26a0885de075d257033564b99f 100644 (file)
@@ -11,6 +11,9 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
@@ -24,7 +27,6 @@ import java.security.PublicKey;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.eclipse.jdt.annotation.Nullable;
-import org.opendaylight.netconf.api.TransportConstants;
 import org.opendaylight.netconf.client.NetconfClientSession;
 import org.opendaylight.netconf.client.NetconfClientSessionListener;
 import org.opendaylight.netconf.client.NetconfClientSessionNegotiatorFactory;
@@ -37,13 +39,15 @@ import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
 import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
 import org.opendaylight.netconf.shaded.sshd.common.future.SshFutureListener;
 import org.opendaylight.netconf.shaded.sshd.common.session.Session;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.parameters.Protocol.Name;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev231025.connection.parameters.Protocol.Name;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 // Non-final for testing
 class CallHomeSessionContext implements CallHomeProtocolSessionContext {
+
     private static final Logger LOG = LoggerFactory.getLogger(CallHomeSessionContext.class);
+    private static final String NETCONF = "netconf";
 
     @VisibleForTesting
     static final Session.AttributeKey<CallHomeSessionContext> SESSION_KEY = new Session.AttributeKey<>();
@@ -85,8 +89,8 @@ class CallHomeSessionContext implements CallHomeProtocolSessionContext {
         LOG.debug("Opening NETCONF Subsystem on {}", sshSession);
         try {
             final MinaSshNettyChannel nettyChannel = newMinaSshNettyChannel();
-            final ClientChannel netconfChannel = ((NetconfClientSessionImpl) sshSession).createSubsystemChannel(
-                TransportConstants.SSH_SUBSYSTEM, nettyChannel.pipeline());
+            final ClientChannel netconfChannel =
+                    ((NetconfClientSessionImpl) sshSession).createSubsystemChannel(NETCONF, nettyChannel.pipeline());
             netconfChannel.setStreaming(ClientChannel.Streaming.Async);
             netconfChannel.open().addListener(newSshFutureListener(netconfChannel, nettyChannel));
         } catch (IOException e) {
@@ -122,10 +126,10 @@ class CallHomeSessionContext implements CallHomeProtocolSessionContext {
         sshSession.close(false);
     }
 
-    private synchronized Promise<NetconfClientSession> doActivate(final ClientChannel netconfChannel,
+    private synchronized ListenableFuture<NetconfClientSession> doActivate(final ClientChannel netconfChannel,
             final NetconfClientSessionListener listener, final MinaSshNettyChannel nettyChannel) {
         if (activated) {
-            return newSessionPromise().setFailure(new IllegalStateException("Session already activated."));
+            return Futures.immediateFailedFuture(new IllegalStateException("Session already activated."));
         }
         activated = true;
         nettyChannel.pipeline().addFirst(new SshWriteAsyncHandlerAdapter(netconfChannel));
@@ -134,11 +138,21 @@ class CallHomeSessionContext implements CallHomeProtocolSessionContext {
         factory.getChannelInitializer(listener).initialize(nettyChannel, activationPromise);
         ((ChannelSubsystem) netconfChannel).onClose(nettyChannel::doNettyDisconnect);
         factory.getNettyGroup().register(nettyChannel).awaitUninterruptibly(500);
-        return activationPromise;
+        final SettableFuture<NetconfClientSession> future = SettableFuture.create();
+        activationPromise.addListener(ignored -> {
+            final var cause = activationPromise.cause();
+            if (cause != null) {
+                future.setException(cause);
+            } else {
+                future.set(activationPromise.getNow());
+            }
+        });
+        return future;
     }
 
     @Deprecated(since = "7.0.0", forRemoval = true)
-    protected MinaSshNettyChannel newMinaSshNettyChannel() {
+    @VisibleForTesting
+    MinaSshNettyChannel newMinaSshNettyChannel() {
         return new MinaSshNettyChannel(this, sshSession);
     }