throws Exception {
final var server = serverFuture.get(2, TimeUnit.SECONDS);
try {
- final var client = FACTORY.connectClient(clientListener, tcpClientParams, sshClientParams)
+ final var client = FACTORY.connectClient("netconf", clientListener, tcpClientParams, sshClientParams)
.get(2, TimeUnit.SECONDS);
try {
// FIXME commented line requires netconf client to trigger netconf subsystem initialization on server
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
+import org.opendaylight.netconf.shaded.sshd.client.future.AuthFuture;
+import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
import org.opendaylight.netconf.shaded.sshd.common.session.Session;
import org.opendaylight.netconf.shaded.sshd.netty.NettyIoServiceFactoryFactory;
import org.opendaylight.netconf.transport.api.TransportChannelListener;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.ssh.client.rev230417.SshClientGrouping;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.client.rev230417.TcpClientGrouping;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.server.rev230417.TcpServerGrouping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A {@link TransportStack} acting as an SSH client.
*/
public final class SSHClient extends SSHTransportStack {
- private SSHClient(final TransportChannelListener listener, final TransportSshClient sshClient) {
+ private static final Logger LOG = LoggerFactory.getLogger(SSHClient.class);
+
+ private final String subsystem;
+
+ private SSHClient(final String subsystem, final TransportChannelListener listener,
+ final TransportSshClient sshClient) {
super(listener, sshClient, sshClient.getSessionFactory());
+ // Mirrors check in ChannelSubsystem's constructor
+ if (subsystem.isBlank()) {
+ throw new IllegalArgumentException("Blank subsystem");
+ }
+ this.subsystem = subsystem;
}
static SSHClient of(final NettyIoServiceFactoryFactory ioServiceFactory, final EventLoopGroup group,
- final TransportChannelListener listener, final SshClientGrouping clientParams)
+ final String subsystem, final TransportChannelListener listener, final SshClientGrouping clientParams)
throws UnsupportedConfigurationException {
- return new SSHClient(listener, new TransportSshClient.Builder(ioServiceFactory, group)
+ return new SSHClient(subsystem, listener, new TransportSshClient.Builder(ioServiceFactory, group)
.transportParams(clientParams.getTransportParams())
.keepAlives(clientParams.getKeepalives())
.clientIdentity(clientParams.getClientIdentity())
@Override
void onKeyEstablished(final Session session) throws IOException {
- if (!(session instanceof ClientSession clientSession)) {
- throw new IOException("Unexpected session " + session);
+ // server key is accepted, trigger authentication flow
+ final var sessionId = sessionId(session);
+ LOG.debug("Authenticating session {}", sessionId);
+ cast(session).auth().addListener(future -> onAuthComplete(future, sessionId));
+ }
+
+ private void onAuthComplete(final AuthFuture future, final Long sessionId) {
+ if (!future.isSuccess()) {
+ LOG.info("Session {} authentication failed", sessionId);
+ deleteSession(sessionId);
+ } else {
+ LOG.debug("Session {} authenticated", sessionId);
}
+ }
- // server key is accepted, trigger authentication flow
- clientSession.auth().addListener(future -> {
- if (!future.isSuccess()) {
- deleteSession(session);
- }
- });
+ @Override
+ void onAuthenticated(final Session session) throws IOException {
+ final var sessionId = sessionId(session);
+ LOG.debug("Opening \"{}\" subsystem on session {}", subsystem, sessionId);
+
+ final var underlay = underlayOf(sessionId);
+ if (underlay == null) {
+ throw new IOException("Cannot find underlay for " + session);
+ }
+
+ final var clientSession = cast(session);
+ final var channel = clientSession.createSubsystemChannel(subsystem);
+ channel.onClose(() -> clientSession.close(true));
+ channel.open(underlay).addListener(future -> onSubsystemOpenComplete(future, sessionId));
+ }
+
+ private void onSubsystemOpenComplete(final OpenFuture future, final Long sessionId) {
+ if (future.isOpened()) {
+ LOG.debug("Established transport on session {}", sessionId);
+ completeUnderlay(sessionId, underlay -> addTransportChannel(new SSHTransportChannel(underlay)));
+ } else {
+ LOG.error("Failed to establish transport on session {}", sessionId, future.getException());
+ deleteSession(sessionId);
+ }
+ }
+
+ private static TransportClientSession cast(final Session session) throws IOException {
+ if (session instanceof TransportClientSession clientSession) {
+ return clientSession;
+ }
+ throw new IOException("Unexpected session " + session);
}
-}
\ No newline at end of file
+}
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.ssh.server.rev230417.SshServerGrouping;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.client.rev230417.TcpClientGrouping;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tcp.server.rev230417.TcpServerGrouping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A {@link TransportStack} acting as an SSH server.
*/
public final class SSHServer extends SSHTransportStack {
+ private static final Logger LOG = LoggerFactory.getLogger(SSHServer.class);
+
private SSHServer(final TransportChannelListener listener, final TransportSshServer sshServer) {
super(listener, sshServer, sshServer.getSessionFactory());
}
void onKeyEstablished(final Session session) {
// No-op
}
+
+ @Override
+ void onAuthenticated(final Session session) {
+ final var sessionId = sessionId(session);
+ LOG.debug("Established transport on session {}", sessionId);
+ // FIXME: we should wait for the subsystem to be created and then finish
+ completeUnderlay(sessionId, underlay -> addTransportChannel(new SSHTransportChannel(underlay)));
+ }
}
\ No newline at end of file
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.netconf.shaded.sshd.common.FactoryManager;
+import org.opendaylight.netconf.shaded.sshd.common.SshConstants;
import org.opendaylight.netconf.shaded.sshd.common.io.IoHandler;
import org.opendaylight.netconf.shaded.sshd.common.session.Session;
import org.opendaylight.netconf.shaded.sshd.common.session.SessionListener;
import org.opendaylight.netconf.transport.api.TransportChannel;
import org.opendaylight.netconf.transport.api.TransportChannelListener;
import org.opendaylight.netconf.transport.api.TransportStack;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * An SSH {@link TransportStack}. Instances of this class are built indirectly.
+ * An SSH {@link TransportStack}. Instances of this class are built indirectly. The setup of the Netty channel is quite
+ * weird. We start off with whatever the underlay sets up.
+ *
+ * <p>
+ * We then add {@link TransportIoSession#getHandler()}, which routes data between the socket and
+ * {@link TransportSshClient} (or {@link TransportSshServer}) -- forming the "bottom half" of the channel.
+ *
+ * <p>
+ * The "upper half" of the channel is formed once the corresponding SSH subsystem is established, via
+ * {@link TransportClientSubsystem}, which installs a {@link OutboundChannelHandler}. These two work together:
+ * <ul>
+ * <li>TransportClientSubsystem pumps bytes inbound from the subsystem towards the tail of the channel pipeline</li>
+ * <li>OutboundChannelHandler pumps bytes outbound from the tail of channel pipeline into the subsystem</li>
+ * </ul>
*/
public abstract sealed class SSHTransportStack extends AbstractOverlayTransportStack<SSHTransportChannel>
implements SessionListener permits SSHClient, SSHServer {
- private final Map<Long, TransportChannel> unauthUnderlays = new ConcurrentHashMap<>();
+ private static final Logger LOG = LoggerFactory.getLogger(SSHTransportStack.class);
+
+ // Underlay TransportChannels which do not have an open subsystem
+ private final Map<Long, TransportChannel> underlays = new ConcurrentHashMap<>();
private final Map<Long, Session> sessions = new ConcurrentHashMap<>();
private final TransportIoService ioService;
@Override
protected void onUnderlayChannelEstablished(final TransportChannel underlayChannel) {
+ LOG.debug("Underlay establishing, attaching SSH to {}", underlayChannel);
+ // Acquire underlying channel, create a TransportIoSession and attach its handler to this channel -- which takes
+ // care of routing bytes between the underlay channel and SSHD's network-facing side.
final var channel = underlayChannel.channel();
final var ioSession = ioService.createSession(channel.localAddress());
-
channel.pipeline().addLast(ioSession.getHandler());
- // authentication triggering and handlers processing is performed by UserAuthSessionListener
- unauthUnderlays.put(ioSession.getId(), underlayChannel);
+
+ // we now have an attached underlay, but it needs further processing before we expose it to the end user
+ underlays.put(ioSession.getId(), underlayChannel);
}
- /*
- * SessionListener integration. Responsible for session authentication for both client and server.
- *
- * <P>Triggers authentication flow when after server key is accepted by client,
- * invokes associated handler on authentication success/failure.
- */
+ // SessionListener integration. Responsible for observing authentication-related events, orchestrating both client
+ // and server interactions.
+ //
+ // The state machine is responsible for driving TransportChannel
+
+ //
+ // At some point we should keep this in an encapsulated state object, but we have specializations, so we keep this
+ // here at the cost of not modeling the solution domain correctly.
+
@Override
public final void sessionCreated(final Session session) {
- sessions.put(session.getIoSession().getId(), session);
+ sessions.put(sessionId(session), session);
}
@Override
public final void sessionException(final Session session, final Throwable throwable) {
- deleteSession(session);
+ final var sessionId = sessionId(session);
+ LOG.warn("Session {} encountered an error", sessionId, throwable);
+ deleteSession(sessionId);
}
@Override
public final void sessionDisconnect(final Session session, final int reason, final String msg,
final String language, final boolean initiator) {
- deleteSession(session);
+ final var sessionId = sessionId(session);
+ LOG.debug("Session {} disconnected: {}", sessionId, SshConstants.getDisconnectReasonName(reason));
+ deleteSession(sessionId);
}
@Override
public final void sessionClosed(final Session session) {
- deleteSession(session);
+ final var sessionId = sessionId(session);
+ LOG.debug("Session {} closed", sessionId);
+ deleteSession(sessionId);
}
@Override
public final void sessionEvent(final Session session, final Event event) {
+ final var sessionId = sessionId(session);
switch (event) {
- case Authenticated:
- onAuthenticated(session);
- break;
- case KeyEstablished:
+ case KeyEstablished -> {
+ LOG.debug("New key established on session {}", sessionId);
try {
onKeyEstablished(session);
} catch (IOException e) {
- sessionException(session, e);
+ LOG.error("Post-key step failed on session {}", sessionId, e);
+ deleteSession(sessionId);
}
- break;
- default:
- // No-op
+ }
+ case Authenticated -> {
+ LOG.debug("Authentication on session {} successful", sessionId);
+ try {
+ onAuthenticated(session);
+ } catch (IOException e) {
+ LOG.error("Post-authentication step failed on session {}", sessionId, e);
+ deleteSession(sessionId);
+ }
+ }
+ case KexCompleted -> {
+ LOG.debug("Key exchange completed on session {}", sessionId);
+ }
+ default -> {
+ LOG.debug("Ignoring event {} on session {}", event, sessionId);
+ }
}
}
abstract void onKeyEstablished(Session session) throws IOException;
- private void onAuthenticated(final Session session) {
- // auth success
- completeAuth(idOf(session), underlay -> addTransportChannel(new SSHTransportChannel(underlay)));
+ abstract void onAuthenticated(Session session) throws IOException;
+
+ final @Nullable TransportChannel underlayOf(final Long sessionId) {
+ return underlays.get(sessionId);
}
- final void deleteSession(final Session session) {
- final var id = idOf(session);
- sessions.remove(id);
+ final void deleteSession(final Long sessionId) {
+ sessions.remove(sessionId);
// auth failure, close underlay if any
- completeAuth(id, underlay -> underlay.channel().close());
+ completeUnderlay(sessionId, underlay -> underlay.channel().close());
}
- private void completeAuth(final Long sessionId, final Consumer<TransportChannel> action) {
- final var removed = unauthUnderlays.remove(sessionId);
+ final void completeUnderlay(final Long sessionId, final Consumer<TransportChannel> action) {
+ final var removed = underlays.remove(sessionId);
if (removed != null) {
action.accept(removed);
}
}
- private static Long idOf(final Session session) {
+ static final Long sessionId(final Session session) {
return session.getIoSession().getId();
}
NettyTransportSupport.newEventLoopGroup(parentGroupName, parentGroupThreads));
}
- public @NonNull ListenableFuture<SSHClient> connectClient(final TransportChannelListener listener,
- final TcpClientGrouping connectParams, final SshClientGrouping clientParams)
- throws UnsupportedConfigurationException {
- return SSHClient.of(ioServiceFactory, group, listener, clientParams).connect(newBootstrap(), connectParams);
+ public @NonNull ListenableFuture<SSHClient> connectClient(final String subsystem,
+ final TransportChannelListener listener, final TcpClientGrouping connectParams,
+ final SshClientGrouping clientParams) throws UnsupportedConfigurationException {
+ return SSHClient.of(ioServiceFactory, group, subsystem, listener, clientParams)
+ .connect(newBootstrap(), connectParams);
}
- public @NonNull ListenableFuture<SSHClient> listenClient(final TransportChannelListener listener,
- final TcpServerGrouping listenParams, final SshClientGrouping clientParams)
- throws UnsupportedConfigurationException {
- return SSHClient.of(ioServiceFactory, group, listener, clientParams).listen(newServerBootstrap(), listenParams);
+ public @NonNull ListenableFuture<SSHClient> listenClient(final String subsystem,
+ final TransportChannelListener listener, final TcpServerGrouping listenParams,
+ final SshClientGrouping clientParams) throws UnsupportedConfigurationException {
+ return SSHClient.of(ioServiceFactory, group, subsystem, listener, clientParams)
+ .listen(newServerBootstrap(), listenParams);
}
public @NonNull ListenableFuture<SSHServer> connectServer(final TransportChannelListener listener,
--- /dev/null
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.ssh;
+
+import java.io.IOException;
+import org.opendaylight.netconf.shaded.sshd.client.session.ClientSessionImpl;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoSession;
+
+/**
+ * Our own version of {@link ClientSessionImpl}, bound to a backend Netty channel.
+ */
+final class TransportClientSession extends ClientSessionImpl {
+ TransportClientSession(final TransportSshClient client, final IoSession ioSession) throws Exception {
+ super(client, ioSession);
+ }
+
+ @Override
+ public TransportClientSubsystem createSubsystemChannel(final String subsystem) throws IOException {
+ final var channel = new TransportClientSubsystem(subsystem);
+ final var service = getConnectionService();
+ final var id = service.registerChannel(channel);
+ if (log.isDebugEnabled()) {
+ log.debug("createSubsystemChannel({})[{}] created id={}", this, subsystem, id);
+ }
+ return channel;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.ssh;
+
+import static java.util.Objects.requireNonNull;
+
+import org.opendaylight.netconf.shaded.sshd.client.session.SessionFactory;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoSession;
+
+/**
+ * A {@link SessionFactory} producing {@link TransportClientSession}s for a particular user.
+ */
+final class TransportClientSessionFactory extends SessionFactory {
+ private final String username;
+
+ TransportClientSessionFactory(final TransportSshClient client, final String username) {
+ super(client);
+ this.username = requireNonNull(username);
+ }
+
+ @Override
+ protected TransportClientSession doCreateSession(final IoSession ioSession) throws Exception {
+ final var ret = new TransportClientSession((TransportSshClient) getClient(), ioSession);
+ ret.setUsername(username);
+ return ret;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.ssh;
+
+import com.google.errorprone.annotations.DoNotCall;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.io.IOException;
+import org.opendaylight.netconf.shaded.sshd.client.channel.ChannelSubsystem;
+import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
+import org.opendaylight.netconf.transport.api.TransportChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ChannelSubsystem} bound to a {@link SSHClient} and a Netty channel.
+ */
+final class TransportClientSubsystem extends ChannelSubsystem {
+ private static final Logger LOG = LoggerFactory.getLogger(TransportClientSubsystem.class);
+
+ private ChannelHandlerContext pipelineHead;
+
+ TransportClientSubsystem(final String subsystem) {
+ super(subsystem);
+ setStreaming(Streaming.Async);
+ }
+
+ @Override
+ @Deprecated
+ @DoNotCall("Always throws UnsupportedOperationException")
+ public OpenFuture open() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ synchronized OpenFuture open(final TransportChannel underlay) throws IOException {
+ LOG.debug("Opening client subsystem \"{}\"", getSubsystem());
+ final var openFuture = super.open();
+ openFuture.addListener(future -> onOpenComplete(future, underlay));
+ return openFuture;
+ }
+
+ private void onOpenComplete(final OpenFuture future, final TransportChannel underlay) {
+ if (!future.isOpened()) {
+ LOG.debug("Failed to open client subsystem \"{}\"", getSubsystem(), future.getException());
+ return;
+ }
+
+ // Note that there may be multiple handlers already present on the channel, hence we are attaching last, but
+ // from the logical perspective we are the head handlers.
+ final var pipeline = underlay.channel().pipeline();
+
+ // - install outbound packet handler, i.e. moving bytes from the channel into SSHD's pipeline
+ pipeline.addLast(new OutboundChannelHandler(getAsyncIn()));
+ // - remember the context of this handler, we will be using it to issue writes into the channel
+ pipelineHead = pipeline.lastContext();
+
+ // - install inner channel termination handler
+ pipeline.addLast(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) throws IOException {
+ close();
+ }
+ });
+ }
+
+ @Override
+ protected void doWriteData(final byte[] data, final int off, final long len) throws IOException {
+ // If we're already closing, ignore incoming data
+ if (isClosing()) {
+ return;
+ }
+
+ final int reqLen = (int) len;
+ if (reqLen > 0) {
+ LOG.debug("Forwarding {} bytes of data", reqLen);
+ pipelineHead.fireChannelRead(Unpooled.copiedBuffer(data, off, reqLen));
+ getLocalWindow().release(reqLen);
+ }
+ }
+
+ @Override
+ protected void doWriteExtendedData(final byte[] data, final int off, final long len) throws IOException {
+ // If we're already closing, ignore incoming data
+ if (isClosing()) {
+ return;
+ }
+ LOG.debug("Discarding {} bytes of extended data", len);
+ if (len > 0) {
+ getLocalWindow().release(len);
+ }
+ }
+}
import org.opendaylight.netconf.shaded.sshd.client.auth.password.UserAuthPasswordFactory;
import org.opendaylight.netconf.shaded.sshd.client.auth.pubkey.UserAuthPublicKeyFactory;
import org.opendaylight.netconf.shaded.sshd.client.keyverifier.ServerKeyVerifier;
-import org.opendaylight.netconf.shaded.sshd.client.session.ClientSessionImpl;
-import org.opendaylight.netconf.shaded.sshd.client.session.SessionFactory;
import org.opendaylight.netconf.shaded.sshd.common.keyprovider.KeyIdentityProvider;
import org.opendaylight.netconf.shaded.sshd.netty.NettyIoServiceFactoryFactory;
import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
throw new UnsupportedConfigurationException("Inconsistent client configuration", e);
}
- ret.setSessionFactory(new SessionFactory(ret) {
- @Override
- protected ClientSessionImpl setupSession(final ClientSessionImpl session) {
- session.setUsername(username);
- return session;
- }
- });
+ ret.setSessionFactory(new TransportClientSessionFactory(ret, username));
return ret;
}
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
import org.opendaylight.netconf.shaded.sshd.common.session.Session;
import org.opendaylight.netconf.shaded.sshd.server.auth.password.UserAuthPasswordFactory;
+import org.opendaylight.netconf.shaded.sshd.server.command.Command;
import org.opendaylight.netconf.shaded.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
import org.opendaylight.netconf.shaded.sshd.server.session.ServerSession;
import org.opendaylight.netconf.shaded.sshd.server.subsystem.SubsystemFactory;
private TransportChannelListener serverListener;
@Mock
private SubsystemFactory subsystemFactory;
+ @Mock
+ private Command subsystem;
@Captor
ArgumentCaptor<TransportChannel> clientTransportChannelCaptor;
when(tcpClientConfig.requireRemoteAddress()).thenCallRealMethod();
when(tcpClientConfig.getRemotePort()).thenReturn(localPort);
when(tcpClientConfig.requireRemotePort()).thenCallRealMethod();
+
+ doReturn("subsystem").when(subsystemFactory).getName();
+ doReturn(subsystem).when(subsystemFactory).createSubsystem(any());
}
@ParameterizedTest(name = "SSH Server Host Key Verification -- {0}")
.get(2, TimeUnit.SECONDS);
try {
// connect with client
- final var client = FACTORY.connectClient(clientListener, tcpClientConfig, sshClientConfig)
+ final var client = FACTORY.connectClient("subsystem", clientListener, tcpClientConfig, sshClientConfig)
.get(2, TimeUnit.SECONDS);
try {
verify(serverListener, timeout(10_000))
factoryManager.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
}).get(2, TimeUnit.SECONDS);
try {
- final var client = FACTORY.connectClient(clientListener, tcpClientConfig, sshClientConfig)
+ final var client = FACTORY.connectClient("subsystem", clientListener, tcpClientConfig, sshClientConfig)
.get(2, TimeUnit.SECONDS);
try {
verify(serverListener, timeout(10_000))