import java.util.List;
import java.util.concurrent.ExecutionException;
import org.opendaylight.netconf.auth.AuthProvider;
-import org.opendaylight.netconf.server.BaseTransportChannelListener;
-import org.opendaylight.netconf.server.NetconfSubsystemFactory;
import org.opendaylight.netconf.server.ServerChannelInitializer;
+import org.opendaylight.netconf.server.ServerTransportInitializer;
import org.opendaylight.netconf.shaded.sshd.server.auth.password.UserAuthPasswordFactory;
import org.opendaylight.netconf.shaded.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
*/
@Component(service = { }, configurationPid = "org.opendaylight.netconf.ssh")
@Designate(ocd = SshServerTransport.Configuration.class)
-public final class SshServerTransport extends BaseTransportChannelListener implements AutoCloseable {
+public final class SshServerTransport implements AutoCloseable {
@ObjectClassDefinition
public @interface Configuration {
@AttributeDefinition
final var localPort = listenParams.requireLocalPort().getValue();
try {
- sshServer = factoryHolder.factory().listenServer(this, new NetconfSubsystemFactory(initializer),
+ sshServer = factoryHolder.factory().listenServer("netconf", new ServerTransportInitializer(initializer),
listenParams, null, factoryMgr -> {
factoryMgr.setUserAuthFactories(List.of(UserAuthPasswordFactory.INSTANCE));
factoryMgr.setPasswordAuthenticator(
package org.opendaylight.netconf.northbound;
import java.util.concurrent.ExecutionException;
-import org.opendaylight.netconf.server.BaseServerTransport;
import org.opendaylight.netconf.server.ServerChannelInitializer;
+import org.opendaylight.netconf.server.ServerTransportInitializer;
import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
import org.opendaylight.netconf.transport.tcp.TCPServer;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IetfInetUtil;
*/
@Component(service = {}, configurationPid = "org.opendaylight.netconf.tcp", enabled = false)
@Designate(ocd = TcpServerTransport.Configuration.class)
-public final class TcpServerTransport extends BaseServerTransport implements AutoCloseable {
+public final class TcpServerTransport implements AutoCloseable {
@ObjectClassDefinition
public @interface Configuration {
@AttributeDefinition
public TcpServerTransport(final TransportFactoryHolder factoryHolder, final ServerChannelInitializer initializer,
final TcpServerGrouping listenParams) {
- super(initializer);
-
final var localAddr = listenParams.requireLocalAddress().stringValue();
final var localPort = listenParams.requireLocalPort().getValue();
try {
- tcpServer = TCPServer.listen(this, factoryHolder.factory().newServerBootstrap(), listenParams).get();
+ tcpServer = TCPServer.listen(new ServerTransportInitializer(initializer),
+ factoryHolder.factory().newServerBootstrap(), listenParams).get();
} catch (UnsupportedConfigurationException | ExecutionException | InterruptedException e) {
LOG.warn("Could not start TCP NETCONF server at {}:{}", localAddr, localPort, e);
throw new IllegalStateException("Could not start TCP NETCONF server", e);
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.EOFException;
}
});
- // FIXME: NETCONF-1106: this is a workaround for netconf-server's NetconfSubsystem using EmbeddedChannel instead
- // of correctly integrating with the underlying transport channel
- if (channel instanceof EmbeddedChannel embeddedChannel) {
- // Embedded event loop implementation has no executor, it requires explicit invocation to process
- synchronized (channel) {
- embeddedChannel.runPendingTasks();
- }
- }
return promise;
}
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
</dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-buffer</artifactId>
- </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
+++ /dev/null
-/*
- * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.netconf.server;
-
-import static java.util.Objects.requireNonNull;
-
-import org.opendaylight.netconf.transport.api.TransportChannel;
-
-/**
- * Abstract base class for NETCONF server implementations working on top of a {@link TransportChannel}.
- */
-public class BaseServerTransport extends BaseTransportChannelListener {
- private final ServerChannelInitializer initializer;
-
- public BaseServerTransport(final ServerChannelInitializer initializer) {
- this.initializer = requireNonNull(initializer);
- }
-
- @Override
- public final void onTransportChannelEstablished(final TransportChannel channel) {
- super.onTransportChannelEstablished(channel);
- final var nettyChannel = channel.channel();
- initializer.initialize(nettyChannel, nettyChannel.eventLoop().newPromise());
- }
-}
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.netconf.server.api.NetconfServerFactory;
-import org.opendaylight.netconf.transport.api.TransportChannelListener;
import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
import org.opendaylight.netconf.transport.ssh.SSHServer;
import org.opendaylight.netconf.transport.ssh.SSHTransportStackFactory;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.server.rev230417.TcpServerGrouping;
public final class NetconfServerFactoryImpl implements NetconfServerFactory {
- private static final TransportChannelListener EMPTY_LISTENER = new BaseTransportChannelListener();
-
private final SSHTransportStackFactory factory;
private final ServerChannelInitializer channelInitializer;
@Override
public ListenableFuture<TCPServer> createTcpServer(final TcpServerGrouping params)
throws UnsupportedConfigurationException {
- return TCPServer.listen(new BaseServerTransport(channelInitializer), factory.newServerBootstrap(), params);
+ return TCPServer.listen(new ServerTransportInitializer(channelInitializer), factory.newServerBootstrap(),
+ params);
}
@Override
public ListenableFuture<SSHServer> createSshServer(final TcpServerGrouping tcpParams,
final SshServerGrouping sshParams, final ServerFactoryManagerConfigurator configurator)
throws UnsupportedConfigurationException {
- return factory.listenServer(EMPTY_LISTENER, new NetconfSubsystemFactory(channelInitializer), tcpParams,
- sshParams, configurator);
+ return factory.listenServer("netconf", new ServerTransportInitializer(channelInitializer), tcpParams, sshParams,
+ configurator);
}
}
+++ /dev/null
-/*
- * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.netconf.server;
-
-import static java.util.Objects.requireNonNull;
-
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.util.concurrent.GlobalEventExecutor;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import org.opendaylight.netconf.api.messages.NetconfHelloMessageAdditionalHeader;
-import org.opendaylight.netconf.shaded.sshd.common.io.IoInputStream;
-import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
-import org.opendaylight.netconf.shaded.sshd.server.channel.ChannelDataReceiver;
-import org.opendaylight.netconf.shaded.sshd.server.channel.ChannelSession;
-import org.opendaylight.netconf.shaded.sshd.server.channel.ChannelSessionAware;
-import org.opendaylight.netconf.shaded.sshd.server.command.AbstractCommandSupport;
-import org.opendaylight.netconf.shaded.sshd.server.command.AsyncCommand;
-import org.opendaylight.netconf.transport.ssh.OutboundChannelHandler;
-
-final class NetconfSubsystem extends AbstractCommandSupport
- implements AsyncCommand, ChannelSessionAware, ChannelDataReceiver {
- // FIXME: NETCONF-1106: do not use EmbeddedChannel here!
- private final EmbeddedChannel innerChannel = new EmbeddedChannel();
- private final ServerChannelInitializer channelInitializer;
-
- NetconfSubsystem(final String name, final ServerChannelInitializer channelInitializer) {
- super(name, null);
- this.channelInitializer = requireNonNull(channelInitializer);
- }
-
- @Override
- public void run() {
- // not used
- }
-
- @Override
- public void setIoInputStream(final IoInputStream in) {
- // not used
- }
-
- @Override
- public void setIoErrorStream(final IoOutputStream err) {
- // not used
- }
-
- @Override
- public void setIoOutputStream(final IoOutputStream out) {
- /*
- * While NETCONF protocol handlers are designed to operate over Netty channel, the inner channel is used to
- * serve NETCONF over SSH.
- */
- // outbound packet handler, adding fist means it will be invoked last because of flow direction
- innerChannel.pipeline().addFirst(new OutboundChannelHandler(out));
-
- // inner channel termination handler
- innerChannel.pipeline().addLast(
- new ChannelInboundHandlerAdapter() {
- @Override
- public void channelInactive(final ChannelHandlerContext ctx) {
- onExit(0);
- }
- });
-
- // NETCONF protocol handlers
- channelInitializer.initialize(innerChannel, GlobalEventExecutor.INSTANCE.newPromise());
- // trigger negotiation flow
- innerChannel.pipeline().fireChannelActive();
- // set additional info for upcoming netconf session
- innerChannel.writeInbound(Unpooled.wrappedBuffer(getHelloAdditionalMessageBytes()));
- }
-
- @Override
- public void setChannelSession(final ChannelSession channelSession) {
- /*
- * Inbound packets handler
- * NOTE: The channel data receiver require to be set within current method, so it could be handled
- * with subsequent logic of ChannelSession#prepareChannelCommand() where this method is executed from.
- */
- channelSession.setDataReceiver(this);
- }
-
- @Override
- public int data(final ChannelSession channel, final byte[] buf, final int start, final int len) {
- // Do not propagate empty invocations
- if (len != 0) {
- innerChannel.writeInbound(Unpooled.copiedBuffer(buf, start, len));
- }
- return len;
- }
-
- @Override
- public void close() {
- innerChannel.close();
- }
-
- @Override
- protected void onExit(final int exitValue, final String exitMessage) {
- super.onExit(exitValue, exitMessage);
- innerChannel.close();
- }
-
- private byte[] getHelloAdditionalMessageBytes() {
- final var session = getServerSession();
- final var address = (InetSocketAddress) session.getClientAddress();
- return new NetconfHelloMessageAdditionalHeader(session.getUsername(), address.getAddress().getHostAddress(),
- String.valueOf(address.getPort()), "ssh", "client")
- .toFormattedString().getBytes(StandardCharsets.UTF_8);
- }
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright (c) 2023 PANTHEON.tech s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.netconf.server;
-
-import static java.util.Objects.requireNonNull;
-
-import java.io.IOException;
-import org.opendaylight.netconf.shaded.sshd.server.channel.ChannelSession;
-import org.opendaylight.netconf.shaded.sshd.server.command.Command;
-import org.opendaylight.netconf.shaded.sshd.server.subsystem.SubsystemFactory;
-
-public final class NetconfSubsystemFactory implements SubsystemFactory {
- private static final String NETCONF = "netconf";
-
- private final ServerChannelInitializer channelInitializer;
-
- public NetconfSubsystemFactory(final ServerChannelInitializer channelInitializer) {
- this.channelInitializer = requireNonNull(channelInitializer);
- }
-
- @Override
- public String getName() {
- return NETCONF;
- }
-
- @Override
- public Command createSubsystem(final ChannelSession channel) throws IOException {
- return new NetconfSubsystem(NETCONF, channelInitializer);
- }
-}
*/
package org.opendaylight.netconf.server;
+import static java.util.Objects.requireNonNull;
+
import org.opendaylight.netconf.transport.api.TransportChannel;
import org.opendaylight.netconf.transport.api.TransportChannelListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BaseTransportChannelListener implements TransportChannelListener {
- private static final Logger LOG = LoggerFactory.getLogger(BaseTransportChannelListener.class);
+/**
+ * A {@link TransportChannelListener} which initializes NETCONF server implementations working on top
+ * of a {@link TransportChannel}.
+ */
+public final class ServerTransportInitializer implements TransportChannelListener {
+ private static final Logger LOG = LoggerFactory.getLogger(ServerTransportInitializer.class);
+
+ private final ServerChannelInitializer initializer;
+
+ public ServerTransportInitializer(final ServerChannelInitializer initializer) {
+ this.initializer = requireNonNull(initializer);
+ }
@Override
public void onTransportChannelEstablished(final TransportChannel channel) {
LOG.debug("Transport channel {} established", channel);
+ final var nettyChannel = channel.channel();
+ initializer.initialize(nettyChannel, nettyChannel.eventLoop().newPromise());
}
@Override
- public final void onTransportChannelFailed(final Throwable cause) {
+ public void onTransportChannelFailed(final Throwable cause) {
LOG.error("Transport channel failed", cause);
}
}
* A ChannelOutboundHandler responsible for redirecting whatever bytes need to be written out on the Netty channel so
* that they pass into SSHD's output.
*/
-// FIXME: NETCONF-1106: hide this class if possible
-public final class OutboundChannelHandler extends ChannelOutboundHandlerAdapter {
+final class OutboundChannelHandler extends ChannelOutboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(OutboundChannelHandler.class);
private final IoOutputStream out;
- public OutboundChannelHandler(final IoOutputStream out) {
+ OutboundChannelHandler(final IoOutputStream out) {
this.out = requireNonNull(out);
}
final var sessionId = sessionId(session);
LOG.debug("Opening \"{}\" subsystem on session {}", subsystem, sessionId);
- final var underlay = underlayOf(sessionId);
- if (underlay == null) {
- throw new IOException("Cannot find underlay for " + session);
- }
-
+ final var underlay = getUnderlayOf(sessionId);
final var clientSession = cast(session);
final var channel = clientSession.createSubsystemChannel(subsystem);
channel.onClose(() -> clientSession.close(true));
private void onSubsystemOpenComplete(final OpenFuture future, final Long sessionId) {
if (future.isOpened()) {
- LOG.debug("Established transport on session {}", sessionId);
- completeUnderlay(sessionId, underlay -> addTransportChannel(new SSHTransportChannel(underlay)));
+ transportEstablished(sessionId);
} else {
LOG.error("Failed to establish transport on session {}", sessionId, future.getException());
deleteSession(sessionId);
}
private static TransportClientSession cast(final Session session) throws IOException {
- if (session instanceof TransportClientSession clientSession) {
- return clientSession;
- }
- throw new IOException("Unexpected session " + session);
+ return TransportUtils.checkCast(TransportClientSession.class, session);
}
}
*/
package org.opendaylight.netconf.transport.ssh;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
+import java.io.IOException;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.netconf.shaded.sshd.common.session.Session;
import org.opendaylight.netconf.shaded.sshd.netty.NettyIoServiceFactoryFactory;
-import org.opendaylight.netconf.shaded.sshd.server.subsystem.SubsystemFactory;
import org.opendaylight.netconf.transport.api.TransportChannelListener;
import org.opendaylight.netconf.transport.api.TransportStack;
import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.ssh.server.rev230417.SshServerGrouping;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.client.rev230417.TcpClientGrouping;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.server.rev230417.TcpServerGrouping;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class SSHServer extends SSHTransportStack {
private static final Logger LOG = LoggerFactory.getLogger(SSHServer.class);
- private SSHServer(final TransportChannelListener listener, final TransportSshServer sshServer) {
+ private final String subsystem;
+
+ private SSHServer(final String subsystem, final TransportChannelListener listener,
+ final TransportSshServer sshServer) {
super(listener, sshServer, sshServer.getSessionFactory());
+ this.subsystem = requireNonNull(subsystem);
}
static SSHServer of(final NettyIoServiceFactoryFactory ioServiceFactory, final EventLoopGroup group,
- final TransportChannelListener listener, final SubsystemFactory subsystemFactory,
- final SshServerGrouping serverParams, final ServerFactoryManagerConfigurator configurator)
- throws UnsupportedConfigurationException {
- return new SSHServer(listener, new TransportSshServer.Builder(ioServiceFactory, group, subsystemFactory)
- .serverParams(serverParams)
- .configurator(configurator)
- .buildChecked());
+ final String subsystem, final TransportChannelListener listener, final SshServerGrouping serverParams,
+ final ServerFactoryManagerConfigurator configurator) throws UnsupportedConfigurationException {
+ return new SSHServer(subsystem, listener,
+ new TransportSshServer.Builder(ioServiceFactory, group)
+ .serverParams(serverParams)
+ .configurator(configurator)
+ .buildChecked());
}
@NonNull ListenableFuture<SSHServer> connect(final Bootstrap bootstrap, final TcpClientGrouping connectParams)
}
@Override
- void onAuthenticated(final Session session) {
+ void onAuthenticated(final Session session) throws IOException {
final var sessionId = sessionId(session);
- LOG.debug("Established transport on session {}", sessionId);
- // FIXME: we should wait for the subsystem to be created and then finish
- completeUnderlay(sessionId, underlay -> addTransportChannel(new SSHTransportChannel(underlay)));
+ LOG.debug("Awaiting \"{}\" subsystem on session {}", subsystem, sessionId);
+
+ Futures.addCallback(cast(session).attachUnderlay(subsystem, getUnderlayOf(sessionId)), new FutureCallback<>() {
+ @Override
+ public void onSuccess(final Empty result) {
+ // Note: we re-validating the underlay ... we may need to refactor state management to make this
+ // non-awkward
+ transportEstablished(sessionId);
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ LOG.debug("Transport on session {} failed", sessionId, cause);
+ deleteSession(sessionId);
+ }
+ }, MoreExecutors.directExecutor());
+ }
+
+ private static TransportServerSession cast(final Session session) throws IOException {
+ return TransportUtils.checkCast(TransportServerSession.class, session);
}
-}
\ No newline at end of file
+}
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
-import org.eclipse.jdt.annotation.Nullable;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.netconf.shaded.sshd.common.FactoryManager;
import org.opendaylight.netconf.shaded.sshd.common.SshConstants;
import org.opendaylight.netconf.shaded.sshd.common.io.IoHandler;
abstract void onAuthenticated(Session session) throws IOException;
- final @Nullable TransportChannel underlayOf(final Long sessionId) {
- return underlays.get(sessionId);
+ final @NonNull TransportChannel getUnderlayOf(final Long sessionId) throws IOException {
+ final var ret = underlays.get(sessionId);
+ if (ret == null) {
+ throw new IOException("Cannot find underlay for " + sessionId);
+ }
+ return ret;
}
final void deleteSession(final Long sessionId) {
completeUnderlay(sessionId, underlay -> underlay.channel().close());
}
- final void completeUnderlay(final Long sessionId, final Consumer<TransportChannel> action) {
+ // FIXME: this should be an assertion, the channel should just be there
+ final void transportEstablished(final Long sessionId) {
+ completeUnderlay(sessionId, underlay -> {
+ LOG.debug("Established transport on session {}", sessionId);
+ addTransportChannel(new SSHTransportChannel(underlay));
+ });
+ }
+
+ private void completeUnderlay(final Long sessionId, final Consumer<TransportChannel> action) {
final var removed = underlays.remove(sessionId);
if (removed != null) {
action.accept(removed);
import io.netty.channel.EventLoopGroup;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.netconf.shaded.sshd.netty.NettyIoServiceFactoryFactory;
-import org.opendaylight.netconf.shaded.sshd.server.subsystem.SubsystemFactory;
import org.opendaylight.netconf.transport.api.TransportChannelListener;
import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
import org.opendaylight.netconf.transport.tcp.NettyTransportSupport;
.listen(newServerBootstrap(), listenParams);
}
- public @NonNull ListenableFuture<SSHServer> connectServer(final TransportChannelListener listener,
- final SubsystemFactory subsystemFactory, final TcpClientGrouping connectParams,
+ public @NonNull ListenableFuture<SSHServer> connectServer(final String subsystem,
+ final TransportChannelListener listener, final TcpClientGrouping connectParams,
final SshServerGrouping serverParams) throws UnsupportedConfigurationException {
- return SSHServer.of(ioServiceFactory, group, listener, subsystemFactory, requireNonNull(serverParams), null)
+ return SSHServer.of(ioServiceFactory, group, subsystem, listener, requireNonNull(serverParams), null)
.connect(newBootstrap(), connectParams);
}
- public @NonNull ListenableFuture<SSHServer> listenServer(final TransportChannelListener listener,
- final SubsystemFactory subsystemFactory, final TcpServerGrouping connectParams,
+ public @NonNull ListenableFuture<SSHServer> listenServer(final String subsystem,
+ final TransportChannelListener listener, final TcpServerGrouping connectParams,
final SshServerGrouping serverParams) throws UnsupportedConfigurationException {
- return listenServer(listener, subsystemFactory, connectParams, requireNonNull(serverParams), null);
+ return listenServer(subsystem, listener, connectParams, requireNonNull(serverParams), null);
}
/**
* Builds and starts SSH Server.
*
* @param listener server channel listener, required
- * @param subsystemFactory A {@link SubsystemFactory} for the hosted subsystem
+ * @param subsystem bound subsystem name
* @param listenParams TCP transport configuration, required
* @param serverParams SSH overlay configuration, optional if configurator is defined, required otherwise
* @param configurator server factory manager configurator, optional if serverParams is defined, required otherwise
* @throws NullPointerException if any of required parameters is null
* @throws IllegalArgumentException if both configurator and serverParams are null
*/
- public @NonNull ListenableFuture<SSHServer> listenServer(final TransportChannelListener listener,
- final SubsystemFactory subsystemFactory, final TcpServerGrouping listenParams,
+ public @NonNull ListenableFuture<SSHServer> listenServer(final String subsystem,
+ final TransportChannelListener listener, final TcpServerGrouping listenParams,
final SshServerGrouping serverParams, final ServerFactoryManagerConfigurator configurator)
- throws UnsupportedConfigurationException {
+ throws UnsupportedConfigurationException {
checkArgument(serverParams != null || configurator != null,
"Neither server parameters nor factory configurator is defined");
- return SSHServer.of(ioServiceFactory, group, listener, subsystemFactory, serverParams, configurator)
+ return SSHServer.of(ioServiceFactory, group, subsystem, listener, serverParams, configurator)
.listen(newServerBootstrap(), listenParams);
}
--- /dev/null
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.ssh;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import org.opendaylight.netconf.shaded.sshd.common.channel.RequestHandler;
+import org.opendaylight.netconf.shaded.sshd.server.channel.ChannelSession;
+
+/**
+ * Our own version of {@link ChannelSession}, bound to a backend Netty channel.
+ */
+final class TransportChannelSession extends ChannelSession {
+ private final TransportServerSession serverSession;
+
+ TransportChannelSession(final TransportServerSession serverSession) {
+ this.serverSession = requireNonNull(serverSession);
+ }
+
+ @Override
+ protected RequestHandler.Result handleSubsystemParsed(final String request, final String subsystem)
+ throws IOException {
+ final var openSubsystem = serverSession.openSubsystem(subsystem);
+ if (openSubsystem == null) {
+ log.warn("handleSubsystemParsed({}) Unsupported subsystem: {}", this, subsystem);
+ return RequestHandler.Result.ReplyFailure;
+ }
+
+ commandInstance = openSubsystem;
+ return prepareChannelCommand(request, commandInstance);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.ssh;
+
+import java.io.IOException;
+import org.opendaylight.netconf.shaded.sshd.common.channel.ChannelFactory;
+import org.opendaylight.netconf.shaded.sshd.common.session.Session;
+
+/**
+ * A {@link ChannelFactory} used with {@link TransportServerSession}s.
+ */
+final class TransportChannelSessionFactory implements ChannelFactory {
+ static final TransportChannelSessionFactory INSTANCE = new TransportChannelSessionFactory();
+
+ private TransportChannelSessionFactory() {
+ // Hidden on purpose
+ }
+
+ @Override
+ public String getName() {
+ // mimic ChannelSessionFactory without referencing it
+ return "session";
+ }
+
+ @Override
+ public TransportChannelSession createChannel(final Session session) throws IOException {
+ return new TransportChannelSession(TransportUtils.checkCast(TransportServerSession.class, session));
+ }
+}
import com.google.errorprone.annotations.DoNotCall;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.IOException;
import org.opendaylight.netconf.shaded.sshd.client.channel.ChannelSubsystem;
import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
final class TransportClientSubsystem extends ChannelSubsystem {
private static final Logger LOG = LoggerFactory.getLogger(TransportClientSubsystem.class);
- private ChannelHandlerContext pipelineHead;
+ private ChannelHandlerContext head;
TransportClientSubsystem(final String subsystem) {
super(subsystem);
}
private void onOpenComplete(final OpenFuture future, final TransportChannel underlay) {
- if (!future.isOpened()) {
+ if (future.isOpened()) {
+ head = TransportUtils.attachUnderlay(getAsyncIn(), underlay, this::close);
+ } else {
LOG.debug("Failed to open client subsystem \"{}\"", getSubsystem(), future.getException());
- return;
}
-
- // Note that there may be multiple handlers already present on the channel, hence we are attaching last, but
- // from the logical perspective we are the head handlers.
- final var pipeline = underlay.channel().pipeline();
-
- // - install outbound packet handler, i.e. moving bytes from the channel into SSHD's pipeline
- pipeline.addLast(new OutboundChannelHandler(getAsyncIn()));
- // - remember the context of this handler, we will be using it to issue writes into the channel
- pipelineHead = pipeline.lastContext();
-
- // - install inner channel termination handler
- pipeline.addLast(new ChannelInboundHandlerAdapter() {
- @Override
- public void channelInactive(final ChannelHandlerContext ctx) throws IOException {
- close();
- }
- });
}
@Override
final int reqLen = (int) len;
if (reqLen > 0) {
LOG.debug("Forwarding {} bytes of data", reqLen);
- pipelineHead.fireChannelRead(Unpooled.copiedBuffer(data, off, reqLen));
+ head.fireChannelRead(Unpooled.copiedBuffer(data, off, reqLen));
getLocalWindow().release(reqLen);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.ssh;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoSession;
+import org.opendaylight.netconf.shaded.sshd.server.session.ServerSessionImpl;
+import org.opendaylight.netconf.transport.api.TransportChannel;
+import org.opendaylight.yangtools.yang.common.Empty;
+
+/**
+ * A {@link ServerSessionImpl}, bound to a backend Netty channel.
+ */
+final class TransportServerSession extends ServerSessionImpl {
+ private record State(String subsystem, TransportChannel underlay, SettableFuture<Empty> future) {
+ State {
+ subsystem = requireNonNull(subsystem);
+ underlay = requireNonNull(underlay);
+ future = requireNonNull(future);
+ }
+ }
+
+ private static final VarHandle STATE;
+
+ static {
+ try {
+ STATE = MethodHandles.lookup().findVarHandle(TransportServerSession.class, "state", State.class);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private volatile State state;
+
+ TransportServerSession(final TransportSshServer server, final IoSession ioSession) throws Exception {
+ super(server, ioSession);
+ }
+
+ ListenableFuture<Empty> attachUnderlay(final String subsystem, final TransportChannel underlay) {
+ final var newState = new State(subsystem, underlay, SettableFuture.create());
+ final var witness = STATE.compareAndExchange(this, null, newState);
+ if (witness != null) {
+ throw new IllegalStateException("Already set up for " + witness);
+ }
+ return newState.future;
+ }
+
+ @Nullable TransportServerSubsystem openSubsystem(final String subsystem) {
+ final var local = (State) STATE.getAndSet(this, null);
+ if (local != null) {
+ if (subsystem.equals(local.subsystem)) {
+ final var ret = new TransportServerSubsystem(subsystem, local.underlay);
+ local.future.set(Empty.value());
+ return ret;
+ }
+ local.future.setException(new IOException("Mismatched subsystem " + subsystem));
+ }
+ return null;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.ssh;
+
+import org.opendaylight.netconf.shaded.sshd.common.io.IoSession;
+import org.opendaylight.netconf.shaded.sshd.server.session.SessionFactory;
+
+/**
+ * A {@link SessionFactory} tied to a {@link TransportSshServer}.
+ */
+final class TransportServerSessionFactory extends SessionFactory {
+ TransportServerSessionFactory(final TransportSshServer server) {
+ super(server);
+ }
+
+ @Override
+ protected TransportServerSession doCreateSession(final IoSession ioSession) throws Exception {
+ return new TransportServerSession((TransportSshServer) getServer(), ioSession);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.ssh;
+
+import static java.util.Objects.requireNonNull;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoInputStream;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
+import org.opendaylight.netconf.shaded.sshd.server.channel.ChannelDataReceiver;
+import org.opendaylight.netconf.shaded.sshd.server.channel.ChannelSession;
+import org.opendaylight.netconf.shaded.sshd.server.channel.ChannelSessionAware;
+import org.opendaylight.netconf.shaded.sshd.server.command.AbstractCommandSupport;
+import org.opendaylight.netconf.shaded.sshd.server.command.AsyncCommand;
+import org.opendaylight.netconf.transport.api.TransportChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class TransportServerSubsystem extends AbstractCommandSupport
+ implements AsyncCommand, ChannelSessionAware, ChannelDataReceiver {
+ private static final Logger LOG = LoggerFactory.getLogger(TransportServerSubsystem.class);
+
+ private final TransportChannel underlay;
+
+ private ChannelHandlerContext head;
+
+ TransportServerSubsystem(final String name, final TransportChannel underlay) {
+ super(name, null);
+ this.underlay = requireNonNull(underlay);
+ }
+
+ @Override
+ public void run() {
+ // not used
+ }
+
+ @Override
+ public void setIoInputStream(final IoInputStream in) {
+ // not used
+ }
+
+ @Override
+ public void setIoErrorStream(final IoOutputStream err) {
+ // not used
+ }
+
+ @Override
+ public void setIoOutputStream(final IoOutputStream out) {
+ head = TransportUtils.attachUnderlay(out, underlay, () -> onExit(0));
+ // set additional info for upcoming netconf session
+// .fireChannelRead(Unpooled.wrappedBuffer(getHelloAdditionalMessageBytes()));
+ }
+
+ @Override
+ public void setChannelSession(final ChannelSession channelSession) {
+ /*
+ * Inbound packets handler
+ * NOTE: The channel data receiver require to be set within current method, so it could be handled
+ * with subsequent logic of ChannelSession#prepareChannelCommand() where this method is executed from.
+ */
+ channelSession.setDataReceiver(this);
+ }
+
+ @Override
+ public int data(final ChannelSession channel, final byte[] buf, final int start, final int len) {
+ // Do not propagate empty invocations
+ if (len > 0) {
+ LOG.debug("Forwarding {} bytes of data", len);
+ head.fireChannelRead(Unpooled.copiedBuffer(buf, start, len));
+ }
+ return len;
+ }
+
+ @Override
+ public void close() {
+ // No-op?
+ }
+//
+// private byte[] getHelloAdditionalMessageBytes() {
+// final var session = getServerSession();
+// final var address = (InetSocketAddress) session.getClientAddress();
+// return new NetconfHelloMessageAdditionalHeader(session.getUsername(), address.getAddress().getHostAddress(),
+// String.valueOf(address.getPort()), "ssh", "client")
+// .toFormattedString().getBytes(StandardCharsets.UTF_8);
+// }
+}
\ No newline at end of file
import io.netty.channel.EventLoopGroup;
import java.security.PublicKey;
import java.util.List;
+import org.opendaylight.netconf.shaded.sshd.common.channel.ChannelFactory;
import org.opendaylight.netconf.shaded.sshd.common.keyprovider.KeyPairProvider;
import org.opendaylight.netconf.shaded.sshd.netty.NettyIoServiceFactoryFactory;
import org.opendaylight.netconf.shaded.sshd.server.ServerBuilder;
import org.opendaylight.netconf.shaded.sshd.server.auth.hostbased.UserAuthHostBasedFactory;
import org.opendaylight.netconf.shaded.sshd.server.auth.password.UserAuthPasswordFactory;
import org.opendaylight.netconf.shaded.sshd.server.auth.pubkey.UserAuthPublicKeyFactory;
-import org.opendaylight.netconf.shaded.sshd.server.subsystem.SubsystemFactory;
+import org.opendaylight.netconf.shaded.sshd.server.forward.DirectTcpipFactory;
import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.ssh.server.rev230417.SshServerGrouping;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.ssh.server.rev230417.ssh.server.grouping.ClientAuthentication;
* {@code ietf-netconf-server.yang} configuration.
*/
static final class Builder extends ServerBuilder {
+ private static final List<ChannelFactory> CHANNEL_FACTORIES = List.of(
+ TransportChannelSessionFactory.INSTANCE,
+ DirectTcpipFactory.INSTANCE);
+
private final NettyIoServiceFactoryFactory ioServiceFactory;
private final EventLoopGroup group;
- private final SubsystemFactory subsystemFactory;
private ServerFactoryManagerConfigurator configurator;
private ClientAuthentication clientAuthentication;
private ServerIdentity serverIdentity;
private Keepalives keepAlives;
- Builder(final NettyIoServiceFactoryFactory ioServiceFactory, final EventLoopGroup group,
- final SubsystemFactory subsystemFactory) {
+ Builder(final NettyIoServiceFactoryFactory ioServiceFactory, final EventLoopGroup group) {
this.ioServiceFactory = requireNonNull(ioServiceFactory);
this.group = requireNonNull(group);
- this.subsystemFactory = requireNonNull(subsystemFactory);
}
Builder serverParams(final SshServerGrouping serverParams) throws UnsupportedConfigurationException {
configurator.configureServerFactoryManager(ret);
}
- ret.setSubsystemFactories(List.of(subsystemFactory));
ret.setIoServiceFactoryFactory(ioServiceFactory);
ret.setScheduledExecutorService(group);
throw new UnsupportedConfigurationException("Inconsistent client configuration", e);
}
- ret.setSessionFactory(ret.createSessionFactory());
+ ret.setSessionFactory(new TransportServerSessionFactory(ret));
return ret;
}
@Override
protected ServerBuilder fillWithDefaultValues() {
+ if (channelFactories == null) {
+ channelFactories = CHANNEL_FACTORIES;
+ }
if (factory == null) {
factory = TransportSshServer::new;
}
*/
package org.opendaylight.netconf.transport.ssh;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.netconf.shaded.sshd.common.NamedFactory;
import org.opendaylight.netconf.shaded.sshd.common.cipher.BuiltinCiphers;
import org.opendaylight.netconf.shaded.sshd.common.cipher.Cipher;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
import org.opendaylight.netconf.shaded.sshd.common.kex.BuiltinDHFactories;
import org.opendaylight.netconf.shaded.sshd.common.kex.KeyExchangeFactory;
import org.opendaylight.netconf.shaded.sshd.common.mac.BuiltinMacs;
import org.opendaylight.netconf.shaded.sshd.common.signature.BuiltinSignatures;
import org.opendaylight.netconf.shaded.sshd.common.signature.Signature;
import org.opendaylight.netconf.shaded.sshd.server.ServerBuilder;
+import org.opendaylight.netconf.transport.api.TransportChannel;
import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.iana.ssh._public.key.algs.rev220616.EcdsaSha2Nistp256;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.iana.ssh._public.key.algs.rev220616.EcdsaSha2Nistp384;
}
return builder.build();
}
+
+ static <T> T checkCast(final Class<T> clazz, final Object obj) throws IOException {
+ try {
+ return clazz.cast(requireNonNull(obj));
+ } catch (ClassCastException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @FunctionalInterface
+ interface ChannelInactive {
+
+ void onChannelInactive() throws Exception;
+ }
+
+ static ChannelHandlerContext attachUnderlay(final IoOutputStream out, final TransportChannel underlay,
+ final ChannelInactive inactive) {
+ // Note that there may be multiple handlers already present on the channel, hence we are attaching last, but
+ // from the logical perspective we are the head handlers.
+ final var pipeline = underlay.channel().pipeline();
+
+ // - install outbound packet handler, i.e. moving bytes from the channel into SSHD's pipeline
+ pipeline.addLast(new OutboundChannelHandler(out));
+ // - remember the context of this handler, we will be using it to issue writes into the channel
+ final var head = pipeline.lastContext();
+
+ // - install inner channel termination handler
+ pipeline.addLast(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ inactive.onChannelInactive();
+ }
+ });
+
+ return head;
+ }
}
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
import org.opendaylight.netconf.shaded.sshd.common.session.Session;
import org.opendaylight.netconf.shaded.sshd.server.auth.password.UserAuthPasswordFactory;
-import org.opendaylight.netconf.shaded.sshd.server.command.Command;
import org.opendaylight.netconf.shaded.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
import org.opendaylight.netconf.shaded.sshd.server.session.ServerSession;
-import org.opendaylight.netconf.shaded.sshd.server.subsystem.SubsystemFactory;
import org.opendaylight.netconf.transport.api.TransportChannel;
import org.opendaylight.netconf.transport.api.TransportChannelListener;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
private SshServerGrouping sshServerConfig;
@Mock
private TransportChannelListener serverListener;
- @Mock
- private SubsystemFactory subsystemFactory;
- @Mock
- private Command subsystem;
@Captor
ArgumentCaptor<TransportChannel> clientTransportChannelCaptor;
when(tcpClientConfig.requireRemoteAddress()).thenCallRealMethod();
when(tcpClientConfig.getRemotePort()).thenReturn(localPort);
when(tcpClientConfig.requireRemotePort()).thenCallRealMethod();
-
- doReturn("subsystem").when(subsystemFactory).getName();
- doReturn(subsystem).when(subsystemFactory).createSubsystem(any());
}
@ParameterizedTest(name = "SSH Server Host Key Verification -- {0}")
private void integrationTest() throws Exception {
// start server
- final var server = FACTORY.listenServer(serverListener, subsystemFactory, tcpServerConfig, sshServerConfig)
+ final var server = FACTORY.listenServer("subsystem", serverListener, tcpServerConfig, sshServerConfig)
.get(2, TimeUnit.SECONDS);
try {
// connect with client
// Accept all keys
when(sshClientConfig.getServerAuthentication()).thenReturn(null);
- final var server = FACTORY.listenServer(serverListener, subsystemFactory, tcpServerConfig, null,
+ final var server = FACTORY.listenServer("subsystem", serverListener, tcpServerConfig, null,
factoryManager -> {
// authenticate user by credentials and generate host key
factoryManager.setUserAuthFactories(List.of(new UserAuthPasswordFactory()));