Require a subsystem for client connections 96/108396/8
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 13 Oct 2023 12:13:50 +0000 (14:13 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 13 Oct 2023 18:38:53 +0000 (20:38 +0200)
SSHClient does not really operate on the raw session, hence we should
not report it as set up after authentication.

Each client is connected to a particular SSH subsystem, and hence we
require the desired name (for example "netconf", but can be anything).

Once we complete authentication, we request the specified subsystem.
When the subsystem is open, we attach it to the Netty channel and only
then do we report the SSHClient to be up.

JIRA: NETCONF-1108
Change-Id: I9210b2924a9b8239161c8f4c649db3677e63b5b3
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
protocol/netconf-server/src/test/java/org/opendaylight/netconf/server/NetconfServerFactoryImplTest.java
transport/transport-ssh/pom.xml
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/SSHClient.java
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/SSHServer.java
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/SSHTransportStack.java
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/SSHTransportStackFactory.java
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportClientSession.java [new file with mode: 0644]
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportClientSessionFactory.java [new file with mode: 0644]
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportClientSubsystem.java [new file with mode: 0644]
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportSshClient.java
transport/transport-ssh/src/test/java/org/opendaylight/netconf/transport/ssh/SshClientServerTest.java

index f788e0c28e8f83a49ec862446a0308a86ad46218..ae23b251540f1f3a3896d620c9b323c72f4dd08b 100644 (file)
@@ -153,7 +153,7 @@ class NetconfServerFactoryImplTest {
         throws Exception {
         final var server = serverFuture.get(2, TimeUnit.SECONDS);
         try {
-            final var client = FACTORY.connectClient(clientListener, tcpClientParams, sshClientParams)
+            final var client = FACTORY.connectClient("netconf", clientListener, tcpClientParams, sshClientParams)
                 .get(2, TimeUnit.SECONDS);
             try {
                 // FIXME commented line requires netconf client to trigger netconf subsystem initialization on server
index fa2eb3a529b2361162d260eda3e7028b10b24fb8..28bff30771a0cb9a9fc7bbddc753fdf01bec36a9 100644 (file)
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-buffer</artifactId>
+        </dependency>
         <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-common</artifactId>
index 60152c57d4fb0d5066751a9087ef090cacade898..c8e465aef754eeaa20bc38506cab6b1d3bae19ef 100644 (file)
@@ -13,7 +13,8 @@ import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.EventLoopGroup;
 import java.io.IOException;
 import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
+import org.opendaylight.netconf.shaded.sshd.client.future.AuthFuture;
+import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
 import org.opendaylight.netconf.shaded.sshd.common.session.Session;
 import org.opendaylight.netconf.shaded.sshd.netty.NettyIoServiceFactoryFactory;
 import org.opendaylight.netconf.transport.api.TransportChannelListener;
@@ -24,19 +25,31 @@ import org.opendaylight.netconf.transport.tcp.TCPServer;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.ssh.client.rev230417.SshClientGrouping;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.client.rev230417.TcpClientGrouping;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.server.rev230417.TcpServerGrouping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A {@link TransportStack} acting as an SSH client.
  */
 public final class SSHClient extends SSHTransportStack {
-    private SSHClient(final TransportChannelListener listener, final TransportSshClient sshClient) {
+    private static final Logger LOG = LoggerFactory.getLogger(SSHClient.class);
+
+    private final String subsystem;
+
+    private SSHClient(final String subsystem, final TransportChannelListener listener,
+            final TransportSshClient sshClient) {
         super(listener, sshClient, sshClient.getSessionFactory());
+        // Mirrors check in ChannelSubsystem's constructor
+        if (subsystem.isBlank()) {
+            throw new IllegalArgumentException("Blank subsystem");
+        }
+        this.subsystem = subsystem;
     }
 
     static SSHClient of(final NettyIoServiceFactoryFactory ioServiceFactory, final EventLoopGroup group,
-            final TransportChannelListener listener, final SshClientGrouping clientParams)
+            final String subsystem, final TransportChannelListener listener, final SshClientGrouping clientParams)
                 throws UnsupportedConfigurationException {
-        return new SSHClient(listener, new TransportSshClient.Builder(ioServiceFactory, group)
+        return new SSHClient(subsystem, listener, new TransportSshClient.Builder(ioServiceFactory, group)
             .transportParams(clientParams.getTransportParams())
             .keepAlives(clientParams.getKeepalives())
             .clientIdentity(clientParams.getClientIdentity())
@@ -56,15 +69,51 @@ public final class SSHClient extends SSHTransportStack {
 
     @Override
     void onKeyEstablished(final Session session) throws IOException {
-        if (!(session instanceof ClientSession clientSession)) {
-            throw new IOException("Unexpected session " + session);
+        // server key is accepted, trigger authentication flow
+        final var sessionId = sessionId(session);
+        LOG.debug("Authenticating session {}", sessionId);
+        cast(session).auth().addListener(future -> onAuthComplete(future, sessionId));
+    }
+
+    private void onAuthComplete(final AuthFuture future, final Long sessionId) {
+        if (!future.isSuccess()) {
+            LOG.info("Session {} authentication failed", sessionId);
+            deleteSession(sessionId);
+        } else {
+            LOG.debug("Session {} authenticated", sessionId);
         }
+    }
 
-        // server key is accepted, trigger authentication flow
-        clientSession.auth().addListener(future -> {
-            if (!future.isSuccess()) {
-                deleteSession(session);
-            }
-        });
+    @Override
+    void onAuthenticated(final Session session) throws IOException {
+        final var sessionId = sessionId(session);
+        LOG.debug("Opening \"{}\" subsystem on session {}", subsystem, sessionId);
+
+        final var underlay = underlayOf(sessionId);
+        if (underlay == null) {
+            throw new IOException("Cannot find underlay for " + session);
+        }
+
+        final var clientSession = cast(session);
+        final var channel = clientSession.createSubsystemChannel(subsystem);
+        channel.onClose(() -> clientSession.close(true));
+        channel.open(underlay).addListener(future -> onSubsystemOpenComplete(future, sessionId));
+    }
+
+    private void onSubsystemOpenComplete(final OpenFuture future, final Long sessionId) {
+        if (future.isOpened()) {
+            LOG.debug("Established transport on session {}", sessionId);
+            completeUnderlay(sessionId, underlay -> addTransportChannel(new SSHTransportChannel(underlay)));
+        } else {
+            LOG.error("Failed to establish transport on session {}", sessionId, future.getException());
+            deleteSession(sessionId);
+        }
+    }
+
+    private static TransportClientSession cast(final Session session) throws IOException {
+        if (session instanceof TransportClientSession clientSession) {
+            return clientSession;
+        }
+        throw new IOException("Unexpected session " + session);
     }
-}
\ No newline at end of file
+}
index bbbf4a933bc7e2b8fbccc3101aa6020ec7cff9c2..8631e105ae05a3aad4cebe5fba3f2b7260f3b0dc 100644 (file)
@@ -23,11 +23,15 @@ import org.opendaylight.netconf.transport.tcp.TCPServer;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.ssh.server.rev230417.SshServerGrouping;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.client.rev230417.TcpClientGrouping;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.server.rev230417.TcpServerGrouping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A {@link TransportStack} acting as an SSH server.
  */
 public final class SSHServer extends SSHTransportStack {
+    private static final Logger LOG = LoggerFactory.getLogger(SSHServer.class);
+
     private SSHServer(final TransportChannelListener listener, final TransportSshServer sshServer) {
         super(listener, sshServer, sshServer.getSessionFactory());
     }
@@ -56,4 +60,12 @@ public final class SSHServer extends SSHTransportStack {
     void onKeyEstablished(final Session session) {
         // No-op
     }
+
+    @Override
+    void onAuthenticated(final Session session) {
+        final var sessionId = sessionId(session);
+        LOG.debug("Established transport on session {}", sessionId);
+        // FIXME: we should wait for the subsystem to be created and then finish
+        completeUnderlay(sessionId, underlay -> addTransportChannel(new SSHTransportChannel(underlay)));
+    }
 }
\ No newline at end of file
index e7090ee82c4ef9259fd1b6d25835839bb213045c..c637ffbecdceb27324277aadc7a54a69800d4895 100644 (file)
@@ -14,7 +14,9 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
+import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.netconf.shaded.sshd.common.FactoryManager;
+import org.opendaylight.netconf.shaded.sshd.common.SshConstants;
 import org.opendaylight.netconf.shaded.sshd.common.io.IoHandler;
 import org.opendaylight.netconf.shaded.sshd.common.session.Session;
 import org.opendaylight.netconf.shaded.sshd.common.session.SessionListener;
@@ -22,13 +24,31 @@ import org.opendaylight.netconf.transport.api.AbstractOverlayTransportStack;
 import org.opendaylight.netconf.transport.api.TransportChannel;
 import org.opendaylight.netconf.transport.api.TransportChannelListener;
 import org.opendaylight.netconf.transport.api.TransportStack;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * An SSH {@link TransportStack}. Instances of this class are built indirectly.
+ * An SSH {@link TransportStack}. Instances of this class are built indirectly. The setup of the Netty channel is quite
+ * weird. We start off with whatever the underlay sets up.
+ *
+ * <p>
+ * We then add {@link TransportIoSession#getHandler()}, which routes data between the socket and
+ * {@link TransportSshClient} (or {@link TransportSshServer}) -- forming the "bottom half" of the channel.
+ *
+ * <p>
+ * The "upper half" of the channel is formed once the corresponding SSH subsystem is established, via
+ * {@link TransportClientSubsystem}, which installs a {@link OutboundChannelHandler}. These two work together:
+ * <ul>
+ *   <li>TransportClientSubsystem pumps bytes inbound from the subsystem towards the tail of the channel pipeline</li>
+ *   <li>OutboundChannelHandler pumps bytes outbound from the tail of channel pipeline into the subsystem</li>
+ * </ul>
  */
 public abstract sealed class SSHTransportStack extends AbstractOverlayTransportStack<SSHTransportChannel>
         implements SessionListener permits SSHClient, SSHServer {
-    private final Map<Long, TransportChannel> unauthUnderlays = new ConcurrentHashMap<>();
+    private static final Logger LOG = LoggerFactory.getLogger(SSHTransportStack.class);
+
+    // Underlay TransportChannels which do not have an open subsystem
+    private final Map<Long, TransportChannel> underlays = new ConcurrentHashMap<>();
     private final Map<Long, Session> sessions = new ConcurrentHashMap<>();
     private final TransportIoService ioService;
 
@@ -44,81 +64,106 @@ public abstract sealed class SSHTransportStack extends AbstractOverlayTransportS
 
     @Override
     protected void onUnderlayChannelEstablished(final TransportChannel underlayChannel) {
+        LOG.debug("Underlay establishing, attaching SSH to {}", underlayChannel);
+        // Acquire underlying channel, create a TransportIoSession and attach its handler to this channel -- which takes
+        // care of routing bytes between the underlay channel and SSHD's network-facing side.
         final var channel = underlayChannel.channel();
         final var ioSession = ioService.createSession(channel.localAddress());
-
         channel.pipeline().addLast(ioSession.getHandler());
-        // authentication triggering and handlers processing is performed by UserAuthSessionListener
-        unauthUnderlays.put(ioSession.getId(), underlayChannel);
+
+        // we now have an attached underlay, but it needs further processing before we expose it to the end user
+        underlays.put(ioSession.getId(), underlayChannel);
     }
 
-    /*
-     * SessionListener integration. Responsible for session authentication for both client and server.
-     *
-     * <P>Triggers authentication flow when after server key is accepted by client,
-     * invokes associated handler on authentication success/failure.
-     */
+    // SessionListener integration. Responsible for observing authentication-related events, orchestrating both client
+    // and server interactions.
+    //
+    // The state machine is responsible for driving TransportChannel
+
+    //
+    // At some point we should keep this in an encapsulated state object, but we have specializations, so we keep this
+    // here at the cost of not modeling the solution domain correctly.
+
     @Override
     public final void sessionCreated(final Session session) {
-        sessions.put(session.getIoSession().getId(), session);
+        sessions.put(sessionId(session), session);
     }
 
     @Override
     public final void sessionException(final Session session, final Throwable throwable) {
-        deleteSession(session);
+        final var sessionId = sessionId(session);
+        LOG.warn("Session {} encountered an error", sessionId, throwable);
+        deleteSession(sessionId);
     }
 
     @Override
     public final void sessionDisconnect(final Session session, final int reason, final String msg,
             final String language, final boolean initiator) {
-        deleteSession(session);
+        final var sessionId = sessionId(session);
+        LOG.debug("Session {} disconnected: {}", sessionId, SshConstants.getDisconnectReasonName(reason));
+        deleteSession(sessionId);
     }
 
     @Override
     public final void sessionClosed(final Session session) {
-        deleteSession(session);
+        final var sessionId = sessionId(session);
+        LOG.debug("Session {} closed", sessionId);
+        deleteSession(sessionId);
     }
 
     @Override
     public final void sessionEvent(final Session session, final Event event) {
+        final var sessionId = sessionId(session);
         switch (event) {
-            case Authenticated:
-                onAuthenticated(session);
-                break;
-            case KeyEstablished:
+            case KeyEstablished -> {
+                LOG.debug("New key established on session {}", sessionId);
                 try {
                     onKeyEstablished(session);
                 } catch (IOException e) {
-                    sessionException(session, e);
+                    LOG.error("Post-key step failed on session {}", sessionId, e);
+                    deleteSession(sessionId);
                 }
-                break;
-            default:
-                // No-op
+            }
+            case Authenticated -> {
+                LOG.debug("Authentication on session {} successful", sessionId);
+                try {
+                    onAuthenticated(session);
+                } catch (IOException e) {
+                    LOG.error("Post-authentication step failed on session {}", sessionId, e);
+                    deleteSession(sessionId);
+                }
+            }
+            case KexCompleted -> {
+                LOG.debug("Key exchange completed on session {}", sessionId);
+            }
+            default -> {
+                LOG.debug("Ignoring event {} on session {}", event, sessionId);
+            }
         }
     }
 
     abstract void onKeyEstablished(Session session) throws IOException;
 
-    private void onAuthenticated(final Session session) {
-        // auth success
-        completeAuth(idOf(session), underlay -> addTransportChannel(new SSHTransportChannel(underlay)));
+    abstract void onAuthenticated(Session session) throws IOException;
+
+    final @Nullable TransportChannel underlayOf(final Long sessionId) {
+        return underlays.get(sessionId);
     }
 
-    final void deleteSession(final Session session) {
-        final var id = idOf(session);
-        sessions.remove(id);
+    final void deleteSession(final Long sessionId) {
+        sessions.remove(sessionId);
         // auth failure, close underlay if any
-        completeAuth(id, underlay -> underlay.channel().close());
+        completeUnderlay(sessionId, underlay -> underlay.channel().close());
     }
 
-    private void completeAuth(final Long sessionId, final Consumer<TransportChannel> action) {
-        final var removed = unauthUnderlays.remove(sessionId);
+    final void completeUnderlay(final Long sessionId, final Consumer<TransportChannel> action) {
+        final var removed = underlays.remove(sessionId);
         if (removed != null) {
             action.accept(removed);
         }
     }
 
-    private static Long idOf(final Session session) {
+    static final Long sessionId(final Session session) {
         return session.getIoSession().getId();
     }
 
index 907cd970fa02a9d19a731630426f4df1c4963ab6..bb31fe03a85aab3392b10d03a3f05deba2344fb8 100644 (file)
@@ -49,16 +49,18 @@ public final class SSHTransportStackFactory implements AutoCloseable {
             NettyTransportSupport.newEventLoopGroup(parentGroupName, parentGroupThreads));
     }
 
-    public @NonNull ListenableFuture<SSHClient> connectClient(final TransportChannelListener listener,
-            final TcpClientGrouping connectParams, final SshClientGrouping clientParams)
-                throws UnsupportedConfigurationException {
-        return SSHClient.of(ioServiceFactory, group, listener, clientParams).connect(newBootstrap(), connectParams);
+    public @NonNull ListenableFuture<SSHClient> connectClient(final String subsystem,
+            final TransportChannelListener listener, final TcpClientGrouping connectParams,
+            final SshClientGrouping clientParams) throws UnsupportedConfigurationException {
+        return SSHClient.of(ioServiceFactory, group, subsystem, listener, clientParams)
+            .connect(newBootstrap(), connectParams);
     }
 
-    public @NonNull ListenableFuture<SSHClient> listenClient(final TransportChannelListener listener,
-            final TcpServerGrouping listenParams, final SshClientGrouping clientParams)
-                throws UnsupportedConfigurationException {
-        return SSHClient.of(ioServiceFactory, group, listener, clientParams).listen(newServerBootstrap(), listenParams);
+    public @NonNull ListenableFuture<SSHClient> listenClient(final String subsystem,
+            final TransportChannelListener listener, final TcpServerGrouping listenParams,
+            final SshClientGrouping clientParams) throws UnsupportedConfigurationException {
+        return SSHClient.of(ioServiceFactory, group, subsystem, listener, clientParams)
+            .listen(newServerBootstrap(), listenParams);
     }
 
     public @NonNull ListenableFuture<SSHServer> connectServer(final TransportChannelListener listener,
diff --git a/transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportClientSession.java b/transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportClientSession.java
new file mode 100644 (file)
index 0000000..4c0a586
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.ssh;
+
+import java.io.IOException;
+import org.opendaylight.netconf.shaded.sshd.client.session.ClientSessionImpl;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoSession;
+
+/**
+ * Our own version of {@link ClientSessionImpl}, bound to a backend Netty channel.
+ */
+final class TransportClientSession extends ClientSessionImpl {
+    TransportClientSession(final TransportSshClient client, final IoSession ioSession) throws Exception {
+        super(client, ioSession);
+    }
+
+    @Override
+    public TransportClientSubsystem createSubsystemChannel(final String subsystem) throws IOException {
+        final var channel = new TransportClientSubsystem(subsystem);
+        final var service = getConnectionService();
+        final var id = service.registerChannel(channel);
+        if (log.isDebugEnabled()) {
+            log.debug("createSubsystemChannel({})[{}] created id={}", this, subsystem, id);
+        }
+        return channel;
+    }
+}
diff --git a/transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportClientSessionFactory.java b/transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportClientSessionFactory.java
new file mode 100644 (file)
index 0000000..8c9f5ff
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.ssh;
+
+import static java.util.Objects.requireNonNull;
+
+import org.opendaylight.netconf.shaded.sshd.client.session.SessionFactory;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoSession;
+
+/**
+ * A {@link SessionFactory} producing {@link TransportClientSession}s for a particular user.
+ */
+final class TransportClientSessionFactory extends SessionFactory {
+    private final String username;
+
+    TransportClientSessionFactory(final TransportSshClient client, final String username) {
+        super(client);
+        this.username = requireNonNull(username);
+    }
+
+    @Override
+    protected TransportClientSession doCreateSession(final IoSession ioSession) throws Exception {
+        final var ret = new TransportClientSession((TransportSshClient) getClient(), ioSession);
+        ret.setUsername(username);
+        return ret;
+    }
+}
diff --git a/transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportClientSubsystem.java b/transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportClientSubsystem.java
new file mode 100644 (file)
index 0000000..80a0a1e
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.ssh;
+
+import com.google.errorprone.annotations.DoNotCall;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.io.IOException;
+import org.opendaylight.netconf.shaded.sshd.client.channel.ChannelSubsystem;
+import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
+import org.opendaylight.netconf.transport.api.TransportChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ChannelSubsystem} bound to a {@link SSHClient} and a Netty channel.
+ */
+final class TransportClientSubsystem extends ChannelSubsystem {
+    private static final Logger LOG = LoggerFactory.getLogger(TransportClientSubsystem.class);
+
+    private ChannelHandlerContext pipelineHead;
+
+    TransportClientSubsystem(final String subsystem) {
+        super(subsystem);
+        setStreaming(Streaming.Async);
+    }
+
+    @Override
+    @Deprecated
+    @DoNotCall("Always throws UnsupportedOperationException")
+    public OpenFuture open() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    synchronized OpenFuture open(final TransportChannel underlay) throws IOException {
+        LOG.debug("Opening client subsystem \"{}\"", getSubsystem());
+        final var openFuture = super.open();
+        openFuture.addListener(future -> onOpenComplete(future, underlay));
+        return openFuture;
+    }
+
+    private void onOpenComplete(final OpenFuture future, final TransportChannel underlay) {
+        if (!future.isOpened()) {
+            LOG.debug("Failed to open client subsystem \"{}\"", getSubsystem(), future.getException());
+            return;
+        }
+
+        // Note that there may be multiple handlers already present on the channel, hence we are attaching last, but
+        // from the logical perspective we are the head handlers.
+        final var pipeline = underlay.channel().pipeline();
+
+        // - install outbound packet handler, i.e. moving bytes from the channel into SSHD's pipeline
+        pipeline.addLast(new OutboundChannelHandler(getAsyncIn()));
+        // - remember the context of this handler, we will be using it to issue writes into the channel
+        pipelineHead = pipeline.lastContext();
+
+        // - install inner channel termination handler
+        pipeline.addLast(new ChannelInboundHandlerAdapter() {
+            @Override
+            public void channelInactive(final ChannelHandlerContext ctx) throws IOException {
+                close();
+            }
+        });
+    }
+
+    @Override
+    protected void doWriteData(final byte[] data, final int off, final long len) throws IOException {
+        // If we're already closing, ignore incoming data
+        if (isClosing()) {
+            return;
+        }
+
+        final int reqLen = (int) len;
+        if (reqLen > 0) {
+            LOG.debug("Forwarding {} bytes of data", reqLen);
+            pipelineHead.fireChannelRead(Unpooled.copiedBuffer(data, off, reqLen));
+            getLocalWindow().release(reqLen);
+        }
+    }
+
+    @Override
+    protected void doWriteExtendedData(final byte[] data, final int off, final long len) throws IOException {
+        // If we're already closing, ignore incoming data
+        if (isClosing()) {
+            return;
+        }
+        LOG.debug("Discarding {} bytes of extended data", len);
+        if (len > 0) {
+            getLocalWindow().release(len);
+        }
+    }
+}
index 58c5df0f6d071507f2ca2ae481a28cfb111895e5..6dcac4023a3a2641560a0f120450bae57476ed2c 100644 (file)
@@ -22,8 +22,6 @@ import org.opendaylight.netconf.shaded.sshd.client.auth.password.PasswordIdentit
 import org.opendaylight.netconf.shaded.sshd.client.auth.password.UserAuthPasswordFactory;
 import org.opendaylight.netconf.shaded.sshd.client.auth.pubkey.UserAuthPublicKeyFactory;
 import org.opendaylight.netconf.shaded.sshd.client.keyverifier.ServerKeyVerifier;
-import org.opendaylight.netconf.shaded.sshd.client.session.ClientSessionImpl;
-import org.opendaylight.netconf.shaded.sshd.client.session.SessionFactory;
 import org.opendaylight.netconf.shaded.sshd.common.keyprovider.KeyIdentityProvider;
 import org.opendaylight.netconf.shaded.sshd.netty.NettyIoServiceFactoryFactory;
 import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
@@ -146,13 +144,7 @@ final class TransportSshClient extends SshClient {
                 throw new UnsupportedConfigurationException("Inconsistent client configuration", e);
             }
 
-            ret.setSessionFactory(new SessionFactory(ret) {
-                @Override
-                protected ClientSessionImpl setupSession(final ClientSessionImpl session) {
-                    session.setUsername(username);
-                    return session;
-                }
-            });
+            ret.setSessionFactory(new TransportClientSessionFactory(ret, username));
             return ret;
         }
 
index e68b7444c353374709615ccaa0dfa11e9d1b9433..dfbf1db714d373fedca0ac2fd3401c0ed7c45d7d 100644 (file)
@@ -11,6 +11,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -53,6 +55,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
 import org.opendaylight.netconf.shaded.sshd.common.session.Session;
 import org.opendaylight.netconf.shaded.sshd.server.auth.password.UserAuthPasswordFactory;
+import org.opendaylight.netconf.shaded.sshd.server.command.Command;
 import org.opendaylight.netconf.shaded.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
 import org.opendaylight.netconf.shaded.sshd.server.session.ServerSession;
 import org.opendaylight.netconf.shaded.sshd.server.subsystem.SubsystemFactory;
@@ -96,6 +99,8 @@ public class SshClientServerTest {
     private TransportChannelListener serverListener;
     @Mock
     private SubsystemFactory subsystemFactory;
+    @Mock
+    private Command subsystem;
 
     @Captor
     ArgumentCaptor<TransportChannel> clientTransportChannelCaptor;
@@ -132,6 +137,9 @@ public class SshClientServerTest {
         when(tcpClientConfig.requireRemoteAddress()).thenCallRealMethod();
         when(tcpClientConfig.getRemotePort()).thenReturn(localPort);
         when(tcpClientConfig.requireRemotePort()).thenCallRealMethod();
+
+        doReturn("subsystem").when(subsystemFactory).getName();
+        doReturn(subsystem).when(subsystemFactory).createSubsystem(any());
     }
 
     @ParameterizedTest(name = "SSH Server Host Key Verification -- {0}")
@@ -224,7 +232,7 @@ public class SshClientServerTest {
             .get(2, TimeUnit.SECONDS);
         try {
             // connect with client
-            final var client = FACTORY.connectClient(clientListener, tcpClientConfig, sshClientConfig)
+            final var client = FACTORY.connectClient("subsystem", clientListener, tcpClientConfig, sshClientConfig)
                 .get(2, TimeUnit.SECONDS);
             try {
                 verify(serverListener, timeout(10_000))
@@ -266,7 +274,7 @@ public class SshClientServerTest {
                 factoryManager.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
             }).get(2, TimeUnit.SECONDS);
         try {
-            final var client = FACTORY.connectClient(clientListener, tcpClientConfig, sshClientConfig)
+            final var client = FACTORY.connectClient("subsystem", clientListener, tcpClientConfig, sshClientConfig)
                 .get(2, TimeUnit.SECONDS);
             try {
                 verify(serverListener, timeout(10_000))