Clean up SSH server subsystem handling 04/108404/9
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 13 Oct 2023 21:12:03 +0000 (23:12 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sat, 14 Oct 2023 00:11:20 +0000 (02:11 +0200)
SSH transport should not report a connection until it has established
the expected transport channel.

This patch reworks the API to instantiation and internal wiring, so that
we invoke TransportChannelListener only once the subsystem has been
allocated.

This has the nice side-effect of reducing shaded.ssh package
proliferation, as things just end up being better encapsualted.

JIRA: NETCONF-1106
Change-Id: Ifcfc3a99d1fa323b105c87724c34f9bba018b78a
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
23 files changed:
apps/netconf-nb/src/main/java/org/opendaylight/netconf/northbound/SshServerTransport.java
apps/netconf-nb/src/main/java/org/opendaylight/netconf/northbound/TcpServerTransport.java
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSession.java
protocol/netconf-server/pom.xml
protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/BaseServerTransport.java [deleted file]
protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/NetconfServerFactoryImpl.java
protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/NetconfSubsystem.java [deleted file]
protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/NetconfSubsystemFactory.java [deleted file]
protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/ServerTransportInitializer.java [moved from protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/BaseTransportChannelListener.java with 50% similarity]
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/OutboundChannelHandler.java
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/SSHClient.java
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/SSHServer.java
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/SSHTransportStack.java
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/SSHTransportStackFactory.java
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportChannelSession.java [new file with mode: 0644]
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportChannelSessionFactory.java [new file with mode: 0644]
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportClientSubsystem.java
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportServerSession.java [new file with mode: 0644]
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportServerSessionFactory.java [new file with mode: 0644]
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportServerSubsystem.java [new file with mode: 0644]
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportSshServer.java
transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportUtils.java
transport/transport-ssh/src/test/java/org/opendaylight/netconf/transport/ssh/SshClientServerTest.java

index aa40dc51f43f02f7ecf038d36502b920574968a0..9260b42fb06e777a6ab3a847b359ac090ce77ef4 100644 (file)
@@ -11,9 +11,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import org.opendaylight.netconf.auth.AuthProvider;
-import org.opendaylight.netconf.server.BaseTransportChannelListener;
-import org.opendaylight.netconf.server.NetconfSubsystemFactory;
 import org.opendaylight.netconf.server.ServerChannelInitializer;
+import org.opendaylight.netconf.server.ServerTransportInitializer;
 import org.opendaylight.netconf.shaded.sshd.server.auth.password.UserAuthPasswordFactory;
 import org.opendaylight.netconf.shaded.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
 import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
@@ -38,7 +37,7 @@ import org.slf4j.LoggerFactory;
  */
 @Component(service = { }, configurationPid = "org.opendaylight.netconf.ssh")
 @Designate(ocd = SshServerTransport.Configuration.class)
-public final class SshServerTransport extends BaseTransportChannelListener implements AutoCloseable {
+public final class SshServerTransport implements AutoCloseable {
     @ObjectClassDefinition
     public @interface Configuration {
         @AttributeDefinition
@@ -68,7 +67,7 @@ public final class SshServerTransport extends BaseTransportChannelListener imple
         final var localPort = listenParams.requireLocalPort().getValue();
 
         try {
-            sshServer = factoryHolder.factory().listenServer(this, new NetconfSubsystemFactory(initializer),
+            sshServer = factoryHolder.factory().listenServer("netconf", new ServerTransportInitializer(initializer),
                 listenParams, null, factoryMgr -> {
                     factoryMgr.setUserAuthFactories(List.of(UserAuthPasswordFactory.INSTANCE));
                     factoryMgr.setPasswordAuthenticator(
index d6819cc31d83b09284dc757b095a21f635ab2335..1fab52c8bbba36b1ca1a19f7f40274f6b866236b 100644 (file)
@@ -8,8 +8,8 @@
 package org.opendaylight.netconf.northbound;
 
 import java.util.concurrent.ExecutionException;
-import org.opendaylight.netconf.server.BaseServerTransport;
 import org.opendaylight.netconf.server.ServerChannelInitializer;
+import org.opendaylight.netconf.server.ServerTransportInitializer;
 import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
 import org.opendaylight.netconf.transport.tcp.TCPServer;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IetfInetUtil;
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
  */
 @Component(service = {}, configurationPid = "org.opendaylight.netconf.tcp", enabled = false)
 @Designate(ocd = TcpServerTransport.Configuration.class)
-public final class TcpServerTransport extends BaseServerTransport implements AutoCloseable {
+public final class TcpServerTransport implements AutoCloseable {
     @ObjectClassDefinition
     public @interface Configuration {
         @AttributeDefinition
@@ -58,13 +58,12 @@ public final class TcpServerTransport extends BaseServerTransport implements Aut
 
     public TcpServerTransport(final TransportFactoryHolder factoryHolder, final ServerChannelInitializer initializer,
             final TcpServerGrouping listenParams) {
-        super(initializer);
-
         final var localAddr = listenParams.requireLocalAddress().stringValue();
         final var localPort = listenParams.requireLocalPort().getValue();
 
         try {
-            tcpServer = TCPServer.listen(this, factoryHolder.factory().newServerBootstrap(), listenParams).get();
+            tcpServer = TCPServer.listen(new ServerTransportInitializer(initializer),
+                factoryHolder.factory().newServerBootstrap(), listenParams).get();
         } catch (UnsupportedConfigurationException | ExecutionException | InterruptedException e) {
             LOG.warn("Could not start TCP NETCONF server at {}:{}", localAddr, localPort, e);
             throw new IllegalStateException("Could not start TCP NETCONF server", e);
index 3eaf83156f3494677196a20b808ae36a0e2588b4..7f1009613af2bee37144fe03a79172f7a7118505 100644 (file)
@@ -15,7 +15,6 @@ import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.MessageToByteEncoder;
 import java.io.EOFException;
@@ -93,14 +92,6 @@ public abstract class AbstractNetconfSession<S extends NetconfSession, L extends
             }
         });
 
-        // FIXME: NETCONF-1106: this is a workaround for netconf-server's NetconfSubsystem using EmbeddedChannel instead
-        //                      of correctly integrating with the underlying transport channel
-        if (channel instanceof EmbeddedChannel embeddedChannel) {
-            // Embedded event loop implementation has no executor, it requires explicit invocation to process
-            synchronized (channel) {
-                embeddedChannel.runPendingTasks();
-            }
-        }
         return promise;
     }
 
index de9011b5f9a0e997ded282d3b9272b83256f9052..11cf12d96b8354965447501e6399ef5861d0c5cb 100644 (file)
       <groupId>io.netty</groupId>
       <artifactId>netty-codec</artifactId>
     </dependency>
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-buffer</artifactId>
-    </dependency>
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-common</artifactId>
diff --git a/protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/BaseServerTransport.java b/protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/BaseServerTransport.java
deleted file mode 100644 (file)
index 4bd8ea8..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.netconf.server;
-
-import static java.util.Objects.requireNonNull;
-
-import org.opendaylight.netconf.transport.api.TransportChannel;
-
-/**
- * Abstract base class for NETCONF server implementations working on top of a {@link TransportChannel}.
- */
-public class BaseServerTransport extends BaseTransportChannelListener {
-    private final ServerChannelInitializer initializer;
-
-    public BaseServerTransport(final ServerChannelInitializer initializer) {
-        this.initializer = requireNonNull(initializer);
-    }
-
-    @Override
-    public final void onTransportChannelEstablished(final TransportChannel channel) {
-        super.onTransportChannelEstablished(channel);
-        final var nettyChannel = channel.channel();
-        initializer.initialize(nettyChannel, nettyChannel.eventLoop().newPromise());
-    }
-}
index c2b5aae35a08b87251b9da576c59bc9cc3ff3a38..d5e971659cf7362c5423d0653e5135de9b3a0ebb 100644 (file)
@@ -11,7 +11,6 @@ import static java.util.Objects.requireNonNull;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.netconf.server.api.NetconfServerFactory;
-import org.opendaylight.netconf.transport.api.TransportChannelListener;
 import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
 import org.opendaylight.netconf.transport.ssh.SSHServer;
 import org.opendaylight.netconf.transport.ssh.SSHTransportStackFactory;
@@ -21,8 +20,6 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.ssh.server.
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.server.rev230417.TcpServerGrouping;
 
 public final class NetconfServerFactoryImpl implements NetconfServerFactory {
-    private static final TransportChannelListener EMPTY_LISTENER = new BaseTransportChannelListener();
-
     private final SSHTransportStackFactory factory;
     private final ServerChannelInitializer channelInitializer;
 
@@ -35,14 +32,15 @@ public final class NetconfServerFactoryImpl implements NetconfServerFactory {
     @Override
     public ListenableFuture<TCPServer> createTcpServer(final TcpServerGrouping params)
             throws UnsupportedConfigurationException {
-        return TCPServer.listen(new BaseServerTransport(channelInitializer), factory.newServerBootstrap(), params);
+        return TCPServer.listen(new ServerTransportInitializer(channelInitializer), factory.newServerBootstrap(),
+            params);
     }
 
     @Override
     public ListenableFuture<SSHServer> createSshServer(final TcpServerGrouping tcpParams,
             final SshServerGrouping sshParams, final ServerFactoryManagerConfigurator configurator)
                 throws UnsupportedConfigurationException {
-        return factory.listenServer(EMPTY_LISTENER, new NetconfSubsystemFactory(channelInitializer), tcpParams,
-            sshParams, configurator);
+        return factory.listenServer("netconf", new ServerTransportInitializer(channelInitializer), tcpParams, sshParams,
+            configurator);
     }
 }
diff --git a/protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/NetconfSubsystem.java b/protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/NetconfSubsystem.java
deleted file mode 100644 (file)
index a1906b5..0000000
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.netconf.server;
-
-import static java.util.Objects.requireNonNull;
-
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.util.concurrent.GlobalEventExecutor;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import org.opendaylight.netconf.api.messages.NetconfHelloMessageAdditionalHeader;
-import org.opendaylight.netconf.shaded.sshd.common.io.IoInputStream;
-import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
-import org.opendaylight.netconf.shaded.sshd.server.channel.ChannelDataReceiver;
-import org.opendaylight.netconf.shaded.sshd.server.channel.ChannelSession;
-import org.opendaylight.netconf.shaded.sshd.server.channel.ChannelSessionAware;
-import org.opendaylight.netconf.shaded.sshd.server.command.AbstractCommandSupport;
-import org.opendaylight.netconf.shaded.sshd.server.command.AsyncCommand;
-import org.opendaylight.netconf.transport.ssh.OutboundChannelHandler;
-
-final class NetconfSubsystem extends AbstractCommandSupport
-        implements AsyncCommand, ChannelSessionAware, ChannelDataReceiver {
-    // FIXME: NETCONF-1106: do not use EmbeddedChannel here!
-    private final EmbeddedChannel innerChannel = new EmbeddedChannel();
-    private final ServerChannelInitializer channelInitializer;
-
-    NetconfSubsystem(final String name, final ServerChannelInitializer channelInitializer) {
-        super(name, null);
-        this.channelInitializer = requireNonNull(channelInitializer);
-    }
-
-    @Override
-    public void run() {
-        // not used
-    }
-
-    @Override
-    public void setIoInputStream(final IoInputStream in) {
-        // not used
-    }
-
-    @Override
-    public void setIoErrorStream(final IoOutputStream err) {
-        // not used
-    }
-
-    @Override
-    public void setIoOutputStream(final IoOutputStream out) {
-        /*
-         * While NETCONF protocol handlers are designed to operate over Netty channel, the inner channel is used to
-         * serve NETCONF over SSH.
-         */
-        // outbound packet handler, adding fist means it will be invoked last because of flow direction
-        innerChannel.pipeline().addFirst(new OutboundChannelHandler(out));
-
-        // inner channel termination handler
-        innerChannel.pipeline().addLast(
-            new ChannelInboundHandlerAdapter() {
-                @Override
-                public void channelInactive(final ChannelHandlerContext ctx) {
-                    onExit(0);
-                }
-            });
-
-        // NETCONF protocol handlers
-        channelInitializer.initialize(innerChannel, GlobalEventExecutor.INSTANCE.newPromise());
-        // trigger negotiation flow
-        innerChannel.pipeline().fireChannelActive();
-        // set additional info for upcoming netconf session
-        innerChannel.writeInbound(Unpooled.wrappedBuffer(getHelloAdditionalMessageBytes()));
-    }
-
-    @Override
-    public void setChannelSession(final ChannelSession channelSession) {
-        /*
-         * Inbound packets handler
-         * NOTE: The channel data receiver require to be set within current method, so it could be handled
-         * with subsequent logic of ChannelSession#prepareChannelCommand() where this method is executed from.
-         */
-        channelSession.setDataReceiver(this);
-    }
-
-    @Override
-    public int data(final ChannelSession channel, final byte[] buf, final int start, final int len) {
-        // Do not propagate empty invocations
-        if (len != 0) {
-            innerChannel.writeInbound(Unpooled.copiedBuffer(buf, start, len));
-        }
-        return len;
-    }
-
-    @Override
-    public void close() {
-        innerChannel.close();
-    }
-
-    @Override
-    protected void onExit(final int exitValue, final String exitMessage) {
-        super.onExit(exitValue, exitMessage);
-        innerChannel.close();
-    }
-
-    private byte[] getHelloAdditionalMessageBytes() {
-        final var session = getServerSession();
-        final var address = (InetSocketAddress) session.getClientAddress();
-        return new NetconfHelloMessageAdditionalHeader(session.getUsername(), address.getAddress().getHostAddress(),
-            String.valueOf(address.getPort()), "ssh", "client")
-            .toFormattedString().getBytes(StandardCharsets.UTF_8);
-    }
-}
\ No newline at end of file
diff --git a/protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/NetconfSubsystemFactory.java b/protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/NetconfSubsystemFactory.java
deleted file mode 100644 (file)
index 9324f6e..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (c) 2023 PANTHEON.tech s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.netconf.server;
-
-import static java.util.Objects.requireNonNull;
-
-import java.io.IOException;
-import org.opendaylight.netconf.shaded.sshd.server.channel.ChannelSession;
-import org.opendaylight.netconf.shaded.sshd.server.command.Command;
-import org.opendaylight.netconf.shaded.sshd.server.subsystem.SubsystemFactory;
-
-public final class NetconfSubsystemFactory implements SubsystemFactory {
-    private static final String NETCONF = "netconf";
-
-    private final ServerChannelInitializer channelInitializer;
-
-    public NetconfSubsystemFactory(final ServerChannelInitializer channelInitializer) {
-        this.channelInitializer = requireNonNull(channelInitializer);
-    }
-
-    @Override
-    public String getName() {
-        return NETCONF;
-    }
-
-    @Override
-    public Command createSubsystem(final ChannelSession channel) throws IOException {
-        return new NetconfSubsystem(NETCONF, channelInitializer);
-    }
-}
similarity index 50%
rename from protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/BaseTransportChannelListener.java
rename to protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/ServerTransportInitializer.java
index 8a695b697944d9f479ed08013ab6b734952b51c8..5f8346a31eb9f7cd3344444e3141a5b21675b02a 100644 (file)
@@ -7,21 +7,35 @@
  */
 package org.opendaylight.netconf.server;
 
+import static java.util.Objects.requireNonNull;
+
 import org.opendaylight.netconf.transport.api.TransportChannel;
 import org.opendaylight.netconf.transport.api.TransportChannelListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BaseTransportChannelListener implements TransportChannelListener {
-    private static final Logger LOG = LoggerFactory.getLogger(BaseTransportChannelListener.class);
+/**
+ * A {@link TransportChannelListener} which initializes NETCONF server implementations working on top
+ * of a {@link TransportChannel}.
+ */
+public final class ServerTransportInitializer implements TransportChannelListener {
+    private static final Logger LOG = LoggerFactory.getLogger(ServerTransportInitializer.class);
+
+    private final ServerChannelInitializer initializer;
+
+    public ServerTransportInitializer(final ServerChannelInitializer initializer) {
+        this.initializer = requireNonNull(initializer);
+    }
 
     @Override
     public void onTransportChannelEstablished(final TransportChannel channel) {
         LOG.debug("Transport channel {} established", channel);
+        final var nettyChannel = channel.channel();
+        initializer.initialize(nettyChannel, nettyChannel.eventLoop().newPromise());
     }
 
     @Override
-    public final void onTransportChannelFailed(final Throwable cause) {
+    public void onTransportChannelFailed(final Throwable cause) {
         LOG.error("Transport channel failed", cause);
     }
 }
index adb833908156e421815bea8d34910b9c883e6145..c7db99a9f9e3004aa9e915614c85aae50618a901 100644 (file)
@@ -24,13 +24,12 @@ import org.slf4j.LoggerFactory;
  * A ChannelOutboundHandler responsible for redirecting whatever bytes need to be written out on the Netty channel so
  * that they pass into SSHD's output.
  */
-// FIXME: NETCONF-1106: hide this class if possible
-public final class OutboundChannelHandler extends ChannelOutboundHandlerAdapter {
+final class OutboundChannelHandler extends ChannelOutboundHandlerAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(OutboundChannelHandler.class);
 
     private final IoOutputStream out;
 
-    public OutboundChannelHandler(final IoOutputStream out) {
+    OutboundChannelHandler(final IoOutputStream out) {
         this.out = requireNonNull(out);
     }
 
index c8e465aef754eeaa20bc38506cab6b1d3bae19ef..604c0ceb56316fb9a4a6dcad34f3ea7a02f04177 100644 (file)
@@ -89,11 +89,7 @@ public final class SSHClient extends SSHTransportStack {
         final var sessionId = sessionId(session);
         LOG.debug("Opening \"{}\" subsystem on session {}", subsystem, sessionId);
 
-        final var underlay = underlayOf(sessionId);
-        if (underlay == null) {
-            throw new IOException("Cannot find underlay for " + session);
-        }
-
+        final var underlay = getUnderlayOf(sessionId);
         final var clientSession = cast(session);
         final var channel = clientSession.createSubsystemChannel(subsystem);
         channel.onClose(() -> clientSession.close(true));
@@ -102,8 +98,7 @@ public final class SSHClient extends SSHTransportStack {
 
     private void onSubsystemOpenComplete(final OpenFuture future, final Long sessionId) {
         if (future.isOpened()) {
-            LOG.debug("Established transport on session {}", sessionId);
-            completeUnderlay(sessionId, underlay -> addTransportChannel(new SSHTransportChannel(underlay)));
+            transportEstablished(sessionId);
         } else {
             LOG.error("Failed to establish transport on session {}", sessionId, future.getException());
             deleteSession(sessionId);
@@ -111,9 +106,6 @@ public final class SSHClient extends SSHTransportStack {
     }
 
     private static TransportClientSession cast(final Session session) throws IOException {
-        if (session instanceof TransportClientSession clientSession) {
-            return clientSession;
-        }
-        throw new IOException("Unexpected session " + session);
+        return TransportUtils.checkCast(TransportClientSession.class, session);
     }
 }
index 8631e105ae05a3aad4cebe5fba3f2b7260f3b0dc..49e62dc3caf6cff21a389e40fefb896454b0e08d 100644 (file)
@@ -7,14 +7,19 @@
  */
 package org.opendaylight.netconf.transport.ssh;
 
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.EventLoopGroup;
+import java.io.IOException;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.netconf.shaded.sshd.common.session.Session;
 import org.opendaylight.netconf.shaded.sshd.netty.NettyIoServiceFactoryFactory;
-import org.opendaylight.netconf.shaded.sshd.server.subsystem.SubsystemFactory;
 import org.opendaylight.netconf.transport.api.TransportChannelListener;
 import org.opendaylight.netconf.transport.api.TransportStack;
 import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
@@ -23,6 +28,7 @@ import org.opendaylight.netconf.transport.tcp.TCPServer;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.ssh.server.rev230417.SshServerGrouping;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.client.rev230417.TcpClientGrouping;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.server.rev230417.TcpServerGrouping;
+import org.opendaylight.yangtools.yang.common.Empty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,18 +38,22 @@ import org.slf4j.LoggerFactory;
 public final class SSHServer extends SSHTransportStack {
     private static final Logger LOG = LoggerFactory.getLogger(SSHServer.class);
 
-    private SSHServer(final TransportChannelListener listener, final TransportSshServer sshServer) {
+    private final String subsystem;
+
+    private SSHServer(final String subsystem, final TransportChannelListener listener,
+            final TransportSshServer sshServer) {
         super(listener, sshServer, sshServer.getSessionFactory());
+        this.subsystem = requireNonNull(subsystem);
     }
 
     static SSHServer of(final NettyIoServiceFactoryFactory ioServiceFactory, final EventLoopGroup group,
-            final TransportChannelListener listener, final SubsystemFactory subsystemFactory,
-            final SshServerGrouping serverParams, final ServerFactoryManagerConfigurator configurator)
-                throws UnsupportedConfigurationException {
-        return new SSHServer(listener, new TransportSshServer.Builder(ioServiceFactory, group, subsystemFactory)
-            .serverParams(serverParams)
-            .configurator(configurator)
-            .buildChecked());
+            final String subsystem, final TransportChannelListener listener, final SshServerGrouping serverParams,
+            final ServerFactoryManagerConfigurator configurator) throws UnsupportedConfigurationException {
+        return new SSHServer(subsystem, listener,
+            new TransportSshServer.Builder(ioServiceFactory, group)
+                .serverParams(serverParams)
+                .configurator(configurator)
+                .buildChecked());
     }
 
     @NonNull ListenableFuture<SSHServer> connect(final Bootstrap bootstrap, final TcpClientGrouping connectParams)
@@ -62,10 +72,27 @@ public final class SSHServer extends SSHTransportStack {
     }
 
     @Override
-    void onAuthenticated(final Session session) {
+    void onAuthenticated(final Session session) throws IOException {
         final var sessionId = sessionId(session);
-        LOG.debug("Established transport on session {}", sessionId);
-        // FIXME: we should wait for the subsystem to be created and then finish
-        completeUnderlay(sessionId, underlay -> addTransportChannel(new SSHTransportChannel(underlay)));
+        LOG.debug("Awaiting \"{}\" subsystem on session {}", subsystem, sessionId);
+
+        Futures.addCallback(cast(session).attachUnderlay(subsystem, getUnderlayOf(sessionId)), new FutureCallback<>() {
+            @Override
+            public void onSuccess(final Empty result) {
+                // Note: we re-validating the underlay ... we may need to refactor state management to make this
+                //       non-awkward
+                transportEstablished(sessionId);
+            }
+
+            @Override
+            public void onFailure(final Throwable cause) {
+                LOG.debug("Transport on session {} failed", sessionId, cause);
+                deleteSession(sessionId);
+            }
+        }, MoreExecutors.directExecutor());
+    }
+
+    private static TransportServerSession cast(final Session session) throws IOException {
+        return TransportUtils.checkCast(TransportServerSession.class, session);
     }
-}
\ No newline at end of file
+}
index c637ffbecdceb27324277aadc7a54a69800d4895..cc515f8d3b42c38a325e20c375d88585be4ce628 100644 (file)
@@ -14,7 +14,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
-import org.eclipse.jdt.annotation.Nullable;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.netconf.shaded.sshd.common.FactoryManager;
 import org.opendaylight.netconf.shaded.sshd.common.SshConstants;
 import org.opendaylight.netconf.shaded.sshd.common.io.IoHandler;
@@ -146,8 +146,12 @@ public abstract sealed class SSHTransportStack extends AbstractOverlayTransportS
 
     abstract void onAuthenticated(Session session) throws IOException;
 
-    final @Nullable TransportChannel underlayOf(final Long sessionId) {
-        return underlays.get(sessionId);
+    final @NonNull TransportChannel getUnderlayOf(final Long sessionId) throws IOException {
+        final var ret = underlays.get(sessionId);
+        if (ret == null) {
+            throw new IOException("Cannot find underlay for " + sessionId);
+        }
+        return ret;
     }
 
     final void deleteSession(final Long sessionId) {
@@ -156,7 +160,15 @@ public abstract sealed class SSHTransportStack extends AbstractOverlayTransportS
         completeUnderlay(sessionId, underlay -> underlay.channel().close());
     }
 
-    final void completeUnderlay(final Long sessionId, final Consumer<TransportChannel> action) {
+    // FIXME: this should be an assertion, the channel should just be there
+    final void transportEstablished(final Long sessionId) {
+        completeUnderlay(sessionId, underlay -> {
+            LOG.debug("Established transport on session {}", sessionId);
+            addTransportChannel(new SSHTransportChannel(underlay));
+        });
+    }
+
+    private void completeUnderlay(final Long sessionId, final Consumer<TransportChannel> action) {
         final var removed = underlays.remove(sessionId);
         if (removed != null) {
             action.accept(removed);
index bb31fe03a85aab3392b10d03a3f05deba2344fb8..a493de5917e39e21b531f987dce14887ff29e166 100644 (file)
@@ -16,7 +16,6 @@ import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.EventLoopGroup;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.netconf.shaded.sshd.netty.NettyIoServiceFactoryFactory;
-import org.opendaylight.netconf.shaded.sshd.server.subsystem.SubsystemFactory;
 import org.opendaylight.netconf.transport.api.TransportChannelListener;
 import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
 import org.opendaylight.netconf.transport.tcp.NettyTransportSupport;
@@ -63,24 +62,24 @@ public final class SSHTransportStackFactory implements AutoCloseable {
             .listen(newServerBootstrap(), listenParams);
     }
 
-    public @NonNull ListenableFuture<SSHServer> connectServer(final TransportChannelListener listener,
-            final SubsystemFactory subsystemFactory, final TcpClientGrouping connectParams,
+    public @NonNull ListenableFuture<SSHServer> connectServer(final String subsystem,
+            final TransportChannelListener listener, final TcpClientGrouping connectParams,
             final SshServerGrouping serverParams) throws UnsupportedConfigurationException {
-        return SSHServer.of(ioServiceFactory, group, listener, subsystemFactory, requireNonNull(serverParams), null)
+        return SSHServer.of(ioServiceFactory, group, subsystem, listener, requireNonNull(serverParams), null)
             .connect(newBootstrap(), connectParams);
     }
 
-    public @NonNull ListenableFuture<SSHServer> listenServer(final TransportChannelListener listener,
-            final SubsystemFactory subsystemFactory, final TcpServerGrouping connectParams,
+    public @NonNull ListenableFuture<SSHServer> listenServer(final String subsystem,
+            final TransportChannelListener listener, final TcpServerGrouping connectParams,
             final SshServerGrouping serverParams) throws UnsupportedConfigurationException {
-        return listenServer(listener, subsystemFactory, connectParams, requireNonNull(serverParams), null);
+        return listenServer(subsystem, listener, connectParams, requireNonNull(serverParams), null);
     }
 
     /**
      * Builds and starts SSH Server.
      *
      * @param listener server channel listener, required
-     * @param subsystemFactory A {@link SubsystemFactory} for the hosted subsystem
+     * @param subsystem bound subsystem name
      * @param listenParams TCP transport configuration, required
      * @param serverParams SSH overlay configuration, optional if configurator is defined, required otherwise
      * @param configurator server factory manager configurator, optional if serverParams is defined, required otherwise
@@ -89,13 +88,13 @@ public final class SSHTransportStackFactory implements AutoCloseable {
      * @throws NullPointerException if any of required parameters is null
      * @throws IllegalArgumentException if both configurator and serverParams are null
      */
-    public @NonNull ListenableFuture<SSHServer> listenServer(final TransportChannelListener listener,
-            final SubsystemFactory subsystemFactory, final TcpServerGrouping listenParams,
+    public @NonNull ListenableFuture<SSHServer> listenServer(final String subsystem,
+            final TransportChannelListener listener, final TcpServerGrouping listenParams,
             final SshServerGrouping serverParams, final ServerFactoryManagerConfigurator configurator)
-                    throws UnsupportedConfigurationException {
+                throws UnsupportedConfigurationException {
         checkArgument(serverParams != null || configurator != null,
             "Neither server parameters nor factory configurator is defined");
-        return SSHServer.of(ioServiceFactory, group, listener, subsystemFactory, serverParams, configurator)
+        return SSHServer.of(ioServiceFactory, group, subsystem, listener, serverParams, configurator)
             .listen(newServerBootstrap(), listenParams);
     }
 
diff --git a/transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportChannelSession.java b/transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportChannelSession.java
new file mode 100644 (file)
index 0000000..7b98a27
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.ssh;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import org.opendaylight.netconf.shaded.sshd.common.channel.RequestHandler;
+import org.opendaylight.netconf.shaded.sshd.server.channel.ChannelSession;
+
+/**
+ * Our own version of {@link ChannelSession}, bound to a backend Netty channel.
+ */
+final class TransportChannelSession extends ChannelSession {
+    private final TransportServerSession serverSession;
+
+    TransportChannelSession(final TransportServerSession serverSession) {
+        this.serverSession = requireNonNull(serverSession);
+    }
+
+    @Override
+    protected RequestHandler.Result handleSubsystemParsed(final String request, final String subsystem)
+            throws IOException {
+        final var openSubsystem = serverSession.openSubsystem(subsystem);
+        if (openSubsystem == null) {
+            log.warn("handleSubsystemParsed({}) Unsupported subsystem: {}", this, subsystem);
+            return RequestHandler.Result.ReplyFailure;
+        }
+
+        commandInstance = openSubsystem;
+        return prepareChannelCommand(request, commandInstance);
+    }
+}
diff --git a/transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportChannelSessionFactory.java b/transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportChannelSessionFactory.java
new file mode 100644 (file)
index 0000000..488f938
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.ssh;
+
+import java.io.IOException;
+import org.opendaylight.netconf.shaded.sshd.common.channel.ChannelFactory;
+import org.opendaylight.netconf.shaded.sshd.common.session.Session;
+
+/**
+ * A {@link ChannelFactory} used with {@link TransportServerSession}s.
+ */
+final class TransportChannelSessionFactory implements ChannelFactory {
+    static final TransportChannelSessionFactory INSTANCE = new TransportChannelSessionFactory();
+
+    private TransportChannelSessionFactory() {
+        // Hidden on purpose
+    }
+
+    @Override
+    public String getName() {
+        // mimic ChannelSessionFactory without referencing it
+        return "session";
+    }
+
+    @Override
+    public TransportChannelSession createChannel(final Session session) throws IOException {
+        return new TransportChannelSession(TransportUtils.checkCast(TransportServerSession.class, session));
+    }
+}
index 80a0a1ef6211054f4fea18ffba99f89d1818277f..3f823d03596e3da0c47f69082a01bddd2e1f0367 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.netconf.transport.ssh;
 import com.google.errorprone.annotations.DoNotCall;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
 import java.io.IOException;
 import org.opendaylight.netconf.shaded.sshd.client.channel.ChannelSubsystem;
 import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
@@ -24,7 +23,7 @@ import org.slf4j.LoggerFactory;
 final class TransportClientSubsystem extends ChannelSubsystem {
     private static final Logger LOG = LoggerFactory.getLogger(TransportClientSubsystem.class);
 
-    private ChannelHandlerContext pipelineHead;
+    private ChannelHandlerContext head;
 
     TransportClientSubsystem(final String subsystem) {
         super(subsystem);
@@ -46,27 +45,11 @@ final class TransportClientSubsystem extends ChannelSubsystem {
     }
 
     private void onOpenComplete(final OpenFuture future, final TransportChannel underlay) {
-        if (!future.isOpened()) {
+        if (future.isOpened()) {
+            head = TransportUtils.attachUnderlay(getAsyncIn(), underlay, this::close);
+        } else {
             LOG.debug("Failed to open client subsystem \"{}\"", getSubsystem(), future.getException());
-            return;
         }
-
-        // Note that there may be multiple handlers already present on the channel, hence we are attaching last, but
-        // from the logical perspective we are the head handlers.
-        final var pipeline = underlay.channel().pipeline();
-
-        // - install outbound packet handler, i.e. moving bytes from the channel into SSHD's pipeline
-        pipeline.addLast(new OutboundChannelHandler(getAsyncIn()));
-        // - remember the context of this handler, we will be using it to issue writes into the channel
-        pipelineHead = pipeline.lastContext();
-
-        // - install inner channel termination handler
-        pipeline.addLast(new ChannelInboundHandlerAdapter() {
-            @Override
-            public void channelInactive(final ChannelHandlerContext ctx) throws IOException {
-                close();
-            }
-        });
     }
 
     @Override
@@ -79,7 +62,7 @@ final class TransportClientSubsystem extends ChannelSubsystem {
         final int reqLen = (int) len;
         if (reqLen > 0) {
             LOG.debug("Forwarding {} bytes of data", reqLen);
-            pipelineHead.fireChannelRead(Unpooled.copiedBuffer(data, off, reqLen));
+            head.fireChannelRead(Unpooled.copiedBuffer(data, off, reqLen));
             getLocalWindow().release(reqLen);
         }
     }
diff --git a/transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportServerSession.java b/transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportServerSession.java
new file mode 100644 (file)
index 0000000..28170ac
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.ssh;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoSession;
+import org.opendaylight.netconf.shaded.sshd.server.session.ServerSessionImpl;
+import org.opendaylight.netconf.transport.api.TransportChannel;
+import org.opendaylight.yangtools.yang.common.Empty;
+
+/**
+ * A {@link ServerSessionImpl}, bound to a backend Netty channel.
+ */
+final class TransportServerSession extends ServerSessionImpl {
+    private record State(String subsystem, TransportChannel underlay, SettableFuture<Empty> future) {
+        State {
+            subsystem = requireNonNull(subsystem);
+            underlay = requireNonNull(underlay);
+            future = requireNonNull(future);
+        }
+    }
+
+    private static final VarHandle STATE;
+
+    static {
+        try {
+            STATE = MethodHandles.lookup().findVarHandle(TransportServerSession.class, "state", State.class);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    @SuppressWarnings("unused")
+    private volatile State state;
+
+    TransportServerSession(final TransportSshServer server, final IoSession ioSession) throws Exception {
+        super(server, ioSession);
+    }
+
+    ListenableFuture<Empty> attachUnderlay(final String subsystem, final TransportChannel underlay) {
+        final var newState = new State(subsystem, underlay, SettableFuture.create());
+        final var witness = STATE.compareAndExchange(this, null, newState);
+        if (witness != null) {
+            throw new IllegalStateException("Already set up for " + witness);
+        }
+        return newState.future;
+    }
+
+    @Nullable TransportServerSubsystem openSubsystem(final String subsystem) {
+        final var local = (State) STATE.getAndSet(this, null);
+        if (local != null) {
+            if (subsystem.equals(local.subsystem)) {
+                final var ret = new TransportServerSubsystem(subsystem, local.underlay);
+                local.future.set(Empty.value());
+                return ret;
+            }
+            local.future.setException(new IOException("Mismatched subsystem " + subsystem));
+        }
+        return null;
+    }
+}
diff --git a/transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportServerSessionFactory.java b/transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportServerSessionFactory.java
new file mode 100644 (file)
index 0000000..4107b36
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.ssh;
+
+import org.opendaylight.netconf.shaded.sshd.common.io.IoSession;
+import org.opendaylight.netconf.shaded.sshd.server.session.SessionFactory;
+
+/**
+ * A {@link SessionFactory} tied to a {@link TransportSshServer}.
+ */
+final class TransportServerSessionFactory extends SessionFactory {
+    TransportServerSessionFactory(final TransportSshServer server) {
+        super(server);
+    }
+
+    @Override
+    protected TransportServerSession doCreateSession(final IoSession ioSession) throws Exception {
+        return new TransportServerSession((TransportSshServer) getServer(), ioSession);
+    }
+}
diff --git a/transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportServerSubsystem.java b/transport/transport-ssh/src/main/java/org/opendaylight/netconf/transport/ssh/TransportServerSubsystem.java
new file mode 100644 (file)
index 0000000..97ed3a7
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.ssh;
+
+import static java.util.Objects.requireNonNull;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoInputStream;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
+import org.opendaylight.netconf.shaded.sshd.server.channel.ChannelDataReceiver;
+import org.opendaylight.netconf.shaded.sshd.server.channel.ChannelSession;
+import org.opendaylight.netconf.shaded.sshd.server.channel.ChannelSessionAware;
+import org.opendaylight.netconf.shaded.sshd.server.command.AbstractCommandSupport;
+import org.opendaylight.netconf.shaded.sshd.server.command.AsyncCommand;
+import org.opendaylight.netconf.transport.api.TransportChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class TransportServerSubsystem extends AbstractCommandSupport
+        implements AsyncCommand, ChannelSessionAware, ChannelDataReceiver {
+    private static final Logger LOG = LoggerFactory.getLogger(TransportServerSubsystem.class);
+
+    private final TransportChannel underlay;
+
+    private ChannelHandlerContext head;
+
+    TransportServerSubsystem(final String name, final TransportChannel underlay) {
+        super(name, null);
+        this.underlay = requireNonNull(underlay);
+    }
+
+    @Override
+    public void run() {
+        // not used
+    }
+
+    @Override
+    public void setIoInputStream(final IoInputStream in) {
+        // not used
+    }
+
+    @Override
+    public void setIoErrorStream(final IoOutputStream err) {
+        // not used
+    }
+
+    @Override
+    public void setIoOutputStream(final IoOutputStream out) {
+        head = TransportUtils.attachUnderlay(out, underlay, () -> onExit(0));
+            // set additional info for upcoming netconf session
+//            .fireChannelRead(Unpooled.wrappedBuffer(getHelloAdditionalMessageBytes()));
+    }
+
+    @Override
+    public void setChannelSession(final ChannelSession channelSession) {
+        /*
+         * Inbound packets handler
+         * NOTE: The channel data receiver require to be set within current method, so it could be handled
+         * with subsequent logic of ChannelSession#prepareChannelCommand() where this method is executed from.
+         */
+        channelSession.setDataReceiver(this);
+    }
+
+    @Override
+    public int data(final ChannelSession channel, final byte[] buf, final int start, final int len) {
+        // Do not propagate empty invocations
+        if (len > 0) {
+            LOG.debug("Forwarding {} bytes of data", len);
+            head.fireChannelRead(Unpooled.copiedBuffer(buf, start, len));
+        }
+        return len;
+    }
+
+    @Override
+    public void close() {
+        // No-op?
+    }
+//
+//    private byte[] getHelloAdditionalMessageBytes() {
+//        final var session = getServerSession();
+//        final var address = (InetSocketAddress) session.getClientAddress();
+//        return new NetconfHelloMessageAdditionalHeader(session.getUsername(), address.getAddress().getHostAddress(),
+//            String.valueOf(address.getPort()), "ssh", "client")
+//            .toFormattedString().getBytes(StandardCharsets.UTF_8);
+//    }
+}
\ No newline at end of file
index 2a51b40443974a7af880523c3f411d234469f231..3bc51cab6c6daa30a39e942ddb32575b6d56dc6b 100644 (file)
@@ -15,6 +15,7 @@ import com.google.errorprone.annotations.DoNotCall;
 import io.netty.channel.EventLoopGroup;
 import java.security.PublicKey;
 import java.util.List;
+import org.opendaylight.netconf.shaded.sshd.common.channel.ChannelFactory;
 import org.opendaylight.netconf.shaded.sshd.common.keyprovider.KeyPairProvider;
 import org.opendaylight.netconf.shaded.sshd.netty.NettyIoServiceFactoryFactory;
 import org.opendaylight.netconf.shaded.sshd.server.ServerBuilder;
@@ -23,7 +24,7 @@ import org.opendaylight.netconf.shaded.sshd.server.auth.UserAuthFactory;
 import org.opendaylight.netconf.shaded.sshd.server.auth.hostbased.UserAuthHostBasedFactory;
 import org.opendaylight.netconf.shaded.sshd.server.auth.password.UserAuthPasswordFactory;
 import org.opendaylight.netconf.shaded.sshd.server.auth.pubkey.UserAuthPublicKeyFactory;
-import org.opendaylight.netconf.shaded.sshd.server.subsystem.SubsystemFactory;
+import org.opendaylight.netconf.shaded.sshd.server.forward.DirectTcpipFactory;
 import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.ssh.server.rev230417.SshServerGrouping;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.ssh.server.rev230417.ssh.server.grouping.ClientAuthentication;
@@ -68,20 +69,21 @@ final class TransportSshServer extends SshServer {
      * {@code ietf-netconf-server.yang} configuration.
      */
     static final class Builder extends ServerBuilder {
+        private static final List<ChannelFactory> CHANNEL_FACTORIES = List.of(
+            TransportChannelSessionFactory.INSTANCE,
+            DirectTcpipFactory.INSTANCE);
+
         private final NettyIoServiceFactoryFactory ioServiceFactory;
         private final EventLoopGroup group;
-        private final SubsystemFactory subsystemFactory;
 
         private ServerFactoryManagerConfigurator configurator;
         private ClientAuthentication clientAuthentication;
         private ServerIdentity serverIdentity;
         private Keepalives keepAlives;
 
-        Builder(final NettyIoServiceFactoryFactory ioServiceFactory, final EventLoopGroup group,
-                final SubsystemFactory subsystemFactory) {
+        Builder(final NettyIoServiceFactoryFactory ioServiceFactory, final EventLoopGroup group) {
             this.ioServiceFactory = requireNonNull(ioServiceFactory);
             this.group = requireNonNull(group);
-            this.subsystemFactory = requireNonNull(subsystemFactory);
         }
 
         Builder serverParams(final SshServerGrouping serverParams) throws UnsupportedConfigurationException {
@@ -144,7 +146,6 @@ final class TransportSshServer extends SshServer {
                 configurator.configureServerFactoryManager(ret);
             }
 
-            ret.setSubsystemFactories(List.of(subsystemFactory));
             ret.setIoServiceFactoryFactory(ioServiceFactory);
             ret.setScheduledExecutorService(group);
 
@@ -154,12 +155,15 @@ final class TransportSshServer extends SshServer {
                 throw new UnsupportedConfigurationException("Inconsistent client configuration", e);
             }
 
-            ret.setSessionFactory(ret.createSessionFactory());
+            ret.setSessionFactory(new TransportServerSessionFactory(ret));
             return ret;
         }
 
         @Override
         protected ServerBuilder fillWithDefaultValues() {
+            if (channelFactories == null) {
+                channelFactories = CHANNEL_FACTORIES;
+            }
             if (factory == null) {
                 factory = TransportSshServer::new;
             }
index de7e3900314a9e27cc5314c8dd4f3f979f07f407..a8e915b9bce71d2440068ac9c0093047a73a5c47 100644 (file)
@@ -7,10 +7,15 @@
  */
 package org.opendaylight.netconf.transport.ssh;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.eclipse.jdt.annotation.Nullable;
@@ -19,6 +24,7 @@ import org.opendaylight.netconf.shaded.sshd.common.BaseBuilder;
 import org.opendaylight.netconf.shaded.sshd.common.NamedFactory;
 import org.opendaylight.netconf.shaded.sshd.common.cipher.BuiltinCiphers;
 import org.opendaylight.netconf.shaded.sshd.common.cipher.Cipher;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
 import org.opendaylight.netconf.shaded.sshd.common.kex.BuiltinDHFactories;
 import org.opendaylight.netconf.shaded.sshd.common.kex.KeyExchangeFactory;
 import org.opendaylight.netconf.shaded.sshd.common.mac.BuiltinMacs;
@@ -26,6 +32,7 @@ import org.opendaylight.netconf.shaded.sshd.common.mac.Mac;
 import org.opendaylight.netconf.shaded.sshd.common.signature.BuiltinSignatures;
 import org.opendaylight.netconf.shaded.sshd.common.signature.Signature;
 import org.opendaylight.netconf.shaded.sshd.server.ServerBuilder;
+import org.opendaylight.netconf.transport.api.TransportChannel;
 import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.iana.ssh._public.key.algs.rev220616.EcdsaSha2Nistp256;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.iana.ssh._public.key.algs.rev220616.EcdsaSha2Nistp384;
@@ -266,4 +273,40 @@ final class TransportUtils {
         }
         return builder.build();
     }
+
+    static <T> T checkCast(final Class<T> clazz, final Object obj) throws IOException {
+        try {
+            return clazz.cast(requireNonNull(obj));
+        } catch (ClassCastException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @FunctionalInterface
+    interface ChannelInactive {
+
+        void onChannelInactive() throws Exception;
+    }
+
+    static ChannelHandlerContext attachUnderlay(final IoOutputStream out, final TransportChannel underlay,
+            final ChannelInactive inactive) {
+        // Note that there may be multiple handlers already present on the channel, hence we are attaching last, but
+        // from the logical perspective we are the head handlers.
+        final var pipeline = underlay.channel().pipeline();
+
+        // - install outbound packet handler, i.e. moving bytes from the channel into SSHD's pipeline
+        pipeline.addLast(new OutboundChannelHandler(out));
+        // - remember the context of this handler, we will be using it to issue writes into the channel
+        final var head = pipeline.lastContext();
+
+        // - install inner channel termination handler
+        pipeline.addLast(new ChannelInboundHandlerAdapter() {
+            @Override
+            public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+                inactive.onChannelInactive();
+            }
+        });
+
+        return head;
+    }
 }
index dfbf1db714d373fedca0ac2fd3401c0ed7c45d7d..a12cf39f9982bcf7f21c5eb3d6be49f94669486b 100644 (file)
@@ -11,8 +11,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -55,10 +53,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
 import org.opendaylight.netconf.shaded.sshd.common.session.Session;
 import org.opendaylight.netconf.shaded.sshd.server.auth.password.UserAuthPasswordFactory;
-import org.opendaylight.netconf.shaded.sshd.server.command.Command;
 import org.opendaylight.netconf.shaded.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
 import org.opendaylight.netconf.shaded.sshd.server.session.ServerSession;
-import org.opendaylight.netconf.shaded.sshd.server.subsystem.SubsystemFactory;
 import org.opendaylight.netconf.transport.api.TransportChannel;
 import org.opendaylight.netconf.transport.api.TransportChannelListener;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
@@ -97,10 +93,6 @@ public class SshClientServerTest {
     private SshServerGrouping sshServerConfig;
     @Mock
     private TransportChannelListener serverListener;
-    @Mock
-    private SubsystemFactory subsystemFactory;
-    @Mock
-    private Command subsystem;
 
     @Captor
     ArgumentCaptor<TransportChannel> clientTransportChannelCaptor;
@@ -137,9 +129,6 @@ public class SshClientServerTest {
         when(tcpClientConfig.requireRemoteAddress()).thenCallRealMethod();
         when(tcpClientConfig.getRemotePort()).thenReturn(localPort);
         when(tcpClientConfig.requireRemotePort()).thenCallRealMethod();
-
-        doReturn("subsystem").when(subsystemFactory).getName();
-        doReturn(subsystem).when(subsystemFactory).createSubsystem(any());
     }
 
     @ParameterizedTest(name = "SSH Server Host Key Verification -- {0}")
@@ -228,7 +217,7 @@ public class SshClientServerTest {
 
     private void integrationTest() throws Exception {
         // start server
-        final var server = FACTORY.listenServer(serverListener, subsystemFactory, tcpServerConfig, sshServerConfig)
+        final var server = FACTORY.listenServer("subsystem", serverListener, tcpServerConfig, sshServerConfig)
             .get(2, TimeUnit.SECONDS);
         try {
             // connect with client
@@ -265,7 +254,7 @@ public class SshClientServerTest {
         // Accept all keys
         when(sshClientConfig.getServerAuthentication()).thenReturn(null);
 
-        final var server = FACTORY.listenServer(serverListener, subsystemFactory, tcpServerConfig, null,
+        final var server = FACTORY.listenServer("subsystem", serverListener, tcpServerConfig, null,
             factoryManager -> {
                 // authenticate user by credentials and generate host key
                 factoryManager.setUserAuthFactories(List.of(new UserAuthPasswordFactory()));