Re-integrate ssh client 34/89634/1
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 27 Sep 2019 11:19:22 +0000 (13:19 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 7 May 2020 12:23:52 +0000 (14:23 +0200)
This re-wraps SshClient in a different way, allowing us to shuffle
information directly into netty.

JIRA: NETCONF-674
Change-Id: Iba0abf81fda90b45403b507c7a00ec9fad48c5a1
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
Signed-off-by: Tibor Král <tibor.kral@pantheon.tech>
Signed-off-by: Vladyslav Marchenko <vladyslav.marchenko@pantheon.tech>
(cherry picked from commit 7461000dc0c8263ae58f3f95e1e3081daf9f69fa)

12 files changed:
netconf/netconf-client/src/main/java/org/opendaylight/netconf/client/NetconfClientDispatcherImpl.java
netconf/netconf-client/src/main/java/org/opendaylight/netconf/client/SshClientChannelInitializer.java
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NetconfClientBuilder.java [new file with mode: 0644]
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NetconfClientSessionImpl.java [new file with mode: 0644]
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NetconfSessionFactory.java [new file with mode: 0644]
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NetconfSshClient.java [new file with mode: 0644]
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NettyAwareChannelSubsystem.java [new file with mode: 0644]
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NettyAwareClientSession.java [new file with mode: 0644]
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/package-info.java [new file with mode: 0644]
netconf/netconf-netty-util/src/test/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java
netconf/tools/netconf-testtool/src/main/java/org/opendaylight/netconf/test/tool/client/stress/ConfigurableClientDispatcher.java

index 45986d8b899230dbeee8663276b29c509800c687..7e824792bac1d5ec3602b28874bef8e076b3b7e7 100644 (file)
@@ -17,7 +17,7 @@ import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
 import org.opendaylight.netconf.nettyutil.AbstractNetconfDispatcher;
-import org.opendaylight.netconf.shaded.sshd.client.SshClient;
+import org.opendaylight.netconf.nettyutil.handler.ssh.client.NetconfSshClient;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,10 +29,10 @@ public class NetconfClientDispatcherImpl
     private static final Logger LOG = LoggerFactory.getLogger(NetconfClientDispatcherImpl.class);
 
     private final Timer timer;
-    private final SshClient sshClient;
+    private final NetconfSshClient sshClient;
 
     public NetconfClientDispatcherImpl(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup,
-                                       final Timer timer, @Nullable final SshClient sshClient) {
+                                       final Timer timer, @Nullable final NetconfSshClient sshClient) {
         super(bossGroup, workerGroup);
         this.timer = timer;
         this.sshClient = sshClient;
index 93f6d006e868c5d06d3297885f489f37d11b3099..53d9a27c962987a80d0ebc844aa213fbe3ccaf5d 100644 (file)
@@ -13,18 +13,17 @@ import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.netconf.nettyutil.AbstractChannelInitializer;
 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
 import org.opendaylight.netconf.nettyutil.handler.ssh.client.AsyncSshHandler;
-import org.opendaylight.netconf.shaded.sshd.client.SshClient;
+import org.opendaylight.netconf.nettyutil.handler.ssh.client.NetconfSshClient;
 
 final class SshClientChannelInitializer extends AbstractChannelInitializer<NetconfClientSession> {
-
     private final AuthenticationHandler authenticationHandler;
     private final NetconfClientSessionNegotiatorFactory negotiatorFactory;
     private final NetconfClientSessionListener sessionListener;
-    private final SshClient sshClient;
+    private final NetconfSshClient sshClient;
 
     SshClientChannelInitializer(final AuthenticationHandler authHandler,
             final NetconfClientSessionNegotiatorFactory negotiatorFactory,
-            final NetconfClientSessionListener sessionListener, @Nullable final SshClient sshClient) {
+            final NetconfClientSessionListener sessionListener, @Nullable final NetconfSshClient sshClient) {
         this.authenticationHandler = authHandler;
         this.negotiatorFactory = negotiatorFactory;
         this.sessionListener = sessionListener;
index 0af368cdf38e8ad3dd100a5bfd7831685fbb102e..1aeeaaf48dc716ff76f68a5944c95ed142f32ef6 100644 (file)
@@ -5,10 +5,11 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
 
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
@@ -40,10 +41,10 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     // Disable default timeouts from mina sshd
     private static final long DEFAULT_TIMEOUT = -1L;
 
-    public static final SshClient DEFAULT_CLIENT;
+    public static final NetconfSshClient DEFAULT_CLIENT;
 
     static {
-        final SshClient c = SshClient.setUpDefaultClient();
+        final NetconfSshClient c = new NetconfClientBuilder().build();
         c.getProperties().put(SshClient.AUTH_TIMEOUT, Long.toString(DEFAULT_TIMEOUT));
         c.getProperties().put(SshClient.IDLE_TIMEOUT, Long.toString(DEFAULT_TIMEOUT));
 
@@ -53,22 +54,22 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         DEFAULT_CLIENT = c;
     }
 
-    private final AuthenticationHandler authenticationHandler;
-    private final SshClient sshClient;
     private final AtomicBoolean isDisconnected = new AtomicBoolean();
-    private Future<?> negotiationFuture;
+    private final AuthenticationHandler authenticationHandler;
+    private final Future<?> negotiationFuture;
+    private final NetconfSshClient sshClient;
 
-    private AsyncSshHandlerReader sshReadAsyncListener;
     private AsyncSshHandlerWriter sshWriteAsyncHandler;
 
-    private ClientChannel channel;
+    private NettyAwareChannelSubsystem channel;
     private ClientSession session;
     private ChannelPromise connectPromise;
     private GenericFutureListener negotiationFutureListener;
 
-    public AsyncSshHandler(final AuthenticationHandler authenticationHandler, final SshClient sshClient,
+    public AsyncSshHandler(final AuthenticationHandler authenticationHandler, final NetconfSshClient sshClient,
             final Future<?> negotiationFuture) {
-        this(authenticationHandler, sshClient);
+        this.authenticationHandler = requireNonNull(authenticationHandler);
+        this.sshClient = requireNonNull(sshClient);
         this.negotiationFuture = negotiationFuture;
     }
 
@@ -79,9 +80,8 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
      * @param sshClient             started SshClient
      */
     public AsyncSshHandler(final AuthenticationHandler authenticationHandler,
-                           final SshClient sshClient) {
-        this.authenticationHandler = Preconditions.checkNotNull(authenticationHandler);
-        this.sshClient = Preconditions.checkNotNull(sshClient);
+                           final NetconfSshClient sshClient) {
+        this(authenticationHandler, sshClient, null);
     }
 
     public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler) {
@@ -97,7 +97,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
      * @return                      {@code AsyncSshHandler}
      */
     public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler,
-            final Future<?> negotiationFuture, @Nullable final SshClient sshClient) {
+            final Future<?> negotiationFuture, final @Nullable NetconfSshClient sshClient) {
         return new AsyncSshHandler(authenticationHandler, sshClient != null ? sshClient : DEFAULT_CLIENT,
                 negotiationFuture);
     }
@@ -121,8 +121,10 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             LOG.trace("SSH session created on channel: {}", ctx.channel());
 
             session = future.getSession();
+            verify(session instanceof NettyAwareClientSession, "Unexpected session %s", session);
+
             final AuthFuture authenticateFuture = authenticationHandler.authenticate(session);
-            final ClientSession localSession = session;
+            final NettyAwareClientSession localSession = (NettyAwareClientSession) session;
             authenticateFuture.addListener(future1 -> {
                 if (future1.isSuccess()) {
                     handleSshAuthenticated(localSession, ctx);
@@ -136,12 +138,13 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         }
     }
 
-    private synchronized void handleSshAuthenticated(final ClientSession newSession, final ChannelHandlerContext ctx) {
+    private synchronized void handleSshAuthenticated(final NettyAwareClientSession newSession,
+            final ChannelHandlerContext ctx) {
         try {
             LOG.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(),
                     newSession.getServerVersion());
 
-            channel = newSession.createSubsystemChannel(SUBSYSTEM);
+            channel = newSession.createSubsystemChannel(SUBSYSTEM, ctx);
             channel.setStreaming(ClientChannel.Streaming.Async);
             channel.open().addListener(future -> {
                 if (future.isOpened()) {
@@ -164,18 +167,9 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             connectPromise.setSuccess();
         }
 
-        // TODO we should also read from error stream and at least log from that
-
-        ClientChannel localChannel = channel;
-        sshReadAsyncListener = new AsyncSshHandlerReader(() -> AsyncSshHandler.this.disconnect(ctx, ctx.newPromise()),
-            ctx::fireChannelRead, localChannel.toString(), localChannel.getAsyncOut());
-
-        // if readAsyncListener receives immediate close,
-        // it will close this handler and closing this handler sets channel variable to null
-        if (channel != null) {
-            sshWriteAsyncHandler = new AsyncSshHandlerWriter(channel.getAsyncIn());
-            ctx.fireChannelActive();
-        }
+        sshWriteAsyncHandler = new AsyncSshHandlerWriter(channel.getAsyncIn());
+        ctx.fireChannelActive();
+        channel.onClose(() -> disconnect(ctx, ctx.newPromise()));
     }
 
     private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable error) {
@@ -239,10 +233,6 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             sshWriteAsyncHandler.close();
         }
 
-        if (sshReadAsyncListener != null) {
-            sshReadAsyncListener.close();
-        }
-
         //If connection promise is not already set, it means negotiation failed
         //we must set connection promise to failure
         if (!connectPromise.isDone()) {
@@ -278,7 +268,12 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             LOG.warn("Unable to cleanup all resources for channel: {}. Ignoring.", ctx.channel(), e);
         }
 
-        channel = null;
+        if (channel != null) {
+            //TODO: see if calling just close() is sufficient
+            //channel.close(false);
+            channel.close();
+            channel = null;
+        }
         promise.setSuccess();
         LOG.debug("SSH session closed on channel: {}", ctx.channel());
     }
diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NetconfClientBuilder.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NetconfClientBuilder.java
new file mode 100644 (file)
index 0000000..d0efcf6
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.nettyutil.handler.ssh.client;
+
+import static com.google.common.base.Verify.verify;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.netconf.shaded.sshd.client.ClientBuilder;
+import org.opendaylight.netconf.shaded.sshd.client.SshClient;
+
+/**
+ * A {@link ClientBuilder} which builds {@link NetconfSshClient} instances.
+ */
+@Beta
+public class NetconfClientBuilder extends ClientBuilder {
+    @Override
+    public NetconfSshClient build() {
+        final SshClient client = super.build();
+        verify(client instanceof NetconfSshClient, "Unexpected client %s", client);
+        return (NetconfSshClient) client;
+    }
+
+    @Override
+    protected ClientBuilder fillWithDefaultValues() {
+        if (factory == null) {
+            factory = NetconfSshClient.DEFAULT_NETCONF_SSH_CLIENT_FACTORY;
+        }
+        return super.fillWithDefaultValues();
+    }
+}
diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NetconfClientSessionImpl.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NetconfClientSessionImpl.java
new file mode 100644 (file)
index 0000000..3ce4c99
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.nettyutil.handler.ssh.client;
+
+import com.google.common.annotations.Beta;
+import io.netty.channel.ChannelHandlerContext;
+import java.io.IOException;
+import org.opendaylight.netconf.shaded.sshd.client.ClientFactoryManager;
+import org.opendaylight.netconf.shaded.sshd.client.SshClient;
+import org.opendaylight.netconf.shaded.sshd.client.session.ClientSessionImpl;
+import org.opendaylight.netconf.shaded.sshd.common.Factory;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoSession;
+import org.opendaylight.netconf.shaded.sshd.common.session.ConnectionService;
+
+/**
+ * A {@link ClientSessionImpl} which additionally allows creation of NETCONF subsystem channel, which is routed to
+ * a particular {@link ChannelHandlerContext}.
+ */
+@Beta
+public class NetconfClientSessionImpl extends ClientSessionImpl implements NettyAwareClientSession {
+    public static final Factory<SshClient> DEFAULT_NETCONF_SSH_CLIENT_FACTORY = SshClient::new;
+
+    public NetconfClientSessionImpl(final ClientFactoryManager client, final IoSession ioSession) throws Exception {
+        super(client, ioSession);
+    }
+
+    @Override
+    public NettyAwareChannelSubsystem createSubsystemChannel(final String subsystem, final ChannelHandlerContext ctx)
+            throws IOException {
+        final NettyAwareChannelSubsystem channel = new NettyAwareChannelSubsystem(subsystem, ctx);
+        final ConnectionService service = getConnectionService();
+        final int id = service.registerChannel(channel);
+        if (log.isDebugEnabled()) {
+            log.debug("createSubsystemChannel({})[{}] created id={}", this, channel.getSubsystem(), id);
+        }
+        return channel;
+    }
+}
diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NetconfSessionFactory.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NetconfSessionFactory.java
new file mode 100644 (file)
index 0000000..c3aa99a
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.nettyutil.handler.ssh.client;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.netconf.shaded.sshd.client.ClientFactoryManager;
+import org.opendaylight.netconf.shaded.sshd.client.session.SessionFactory;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoSession;
+
+/**
+ * A {@link SessionFactory} which creates {@link NetconfClientSessionImpl}s.
+ */
+@Beta
+public class NetconfSessionFactory extends SessionFactory {
+    public NetconfSessionFactory(final ClientFactoryManager client) {
+        super(client);
+    }
+
+    @Override
+    protected NetconfClientSessionImpl doCreateSession(final IoSession ioSession) throws Exception {
+        return new NetconfClientSessionImpl(getClient(), ioSession);
+    }
+}
diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NetconfSshClient.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NetconfSshClient.java
new file mode 100644 (file)
index 0000000..7b5308a
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.nettyutil.handler.ssh.client;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.netconf.shaded.sshd.client.SshClient;
+import org.opendaylight.netconf.shaded.sshd.common.Factory;
+import org.opendaylight.netconf.shaded.sshd.common.forward.PortForwardingEventListener;
+import org.opendaylight.netconf.shaded.sshd.common.util.net.SshdSocketAddress;
+
+/**
+ * An extension to {@link SshClient} which uses {@link NetconfSessionFactory} to create sessions (leading towards
+ * {@link NetconfClientSessionImpl}.
+ */
+@Beta
+public class NetconfSshClient extends SshClient {
+    public static final Factory<SshClient> DEFAULT_NETCONF_SSH_CLIENT_FACTORY = NetconfSshClient::new;
+
+    /*
+     * This is a workaround for sshd-core's instantiation of Proxies. AbstractFactoryManager (which is our superclass)
+     * is calling Proxy.newProxyInstance() with getClass().getClassLoader(), i.e. our class loader.
+     *
+     * Since we are not using PortForwardingEventListener, our classloader does not see it (because we do not import
+     * that package), which leads to an instantiation failure.
+     *
+     * Having these dumb fields alleviates the problem, as it forces the packages to be imported by our bundle.
+     *
+     * FIXME: Remove this once we have an SSHD version with  https://issues.apache.org/jira/browse/SSHD-975 fixed
+     */
+    static final class Sshd975Workarounds {
+        static final PortForwardingEventListener PFEL = null;
+        static final SshdSocketAddress SSA = null;
+    }
+
+    @Override
+    protected NetconfSessionFactory createSessionFactory() {
+        return new NetconfSessionFactory(this);
+    }
+}
diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NettyAwareChannelSubsystem.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NettyAwareChannelSubsystem.java
new file mode 100644 (file)
index 0000000..5df448b
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.nettyutil.handler.ssh.client;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.Beta;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import java.io.IOException;
+import org.opendaylight.netconf.shaded.sshd.client.channel.ChannelSubsystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ChannelSubsystem} for subsystem which routes incoming data to a particular {@link ChannelHandlerContext}.
+ */
+@Beta
+public class NettyAwareChannelSubsystem extends ChannelSubsystem {
+    private static final Logger LOG = LoggerFactory.getLogger(NettyAwareChannelSubsystem.class);
+
+    private final ChannelHandlerContext ctx;
+
+    public NettyAwareChannelSubsystem(final String subsystem, final ChannelHandlerContext ctx) {
+        super(subsystem);
+        this.ctx = requireNonNull(ctx);
+    }
+
+    @Override
+    protected void doWriteData(final byte[] data, final int off, final long len) throws IOException {
+        // If we're already closing, ignore incoming data
+        if (!isClosing()) {
+            // TODO: consider using context's allocator for heap buffer here
+            ctx.fireChannelRead(Unpooled.copiedBuffer(data, off, (int) len));
+        }
+    }
+
+    @Override
+    protected void doWriteExtendedData(final byte[] data, final int off, final long len) throws IOException {
+        // If we're already closing, ignore incoming data
+        if (!isClosing()) {
+            LOG.debug("Discarding {} bytes of extended data", len);
+        }
+    }
+
+    @Override
+    public void close() {
+        this.close(false);
+    }
+}
diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NettyAwareClientSession.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/NettyAwareClientSession.java
new file mode 100644 (file)
index 0000000..1d69030
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.nettyutil.handler.ssh.client;
+
+import com.google.common.annotations.Beta;
+import io.netty.channel.ChannelHandlerContext;
+import java.io.IOException;
+import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
+
+/**
+ * A {@link ClientSession} which additionally allows subsystem channels which are forwarded to a particular Netty
+ * channel context.
+ */
+@Beta
+public interface NettyAwareClientSession extends ClientSession {
+    /**
+     * Allocate a channel to the specified subsystem subsystem. Incoming data on the channel will be routed to the
+     * specified ChannelHandlerContext.
+     *
+     * @param subsystem The subsystem name
+     * @param ctx Context to which to route data to
+     * @return The created {@link NettyAwareChannelSubsystem}
+     * @throws IOException If failed to create the requested channel
+     */
+    NettyAwareChannelSubsystem createSubsystemChannel(String subsystem, ChannelHandlerContext ctx) throws IOException;
+}
diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/package-info.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/package-info.java
new file mode 100644 (file)
index 0000000..eb2f955
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+/**
+ * Utilities for integration between Apache SSHD and Netty. Contains the wiring logic to extend SshClient to allow
+ * efficient shuffling of data towards the Netty channel.
+ */
+package org.opendaylight.netconf.nettyutil.handler.ssh.client;
\ No newline at end of file
index ca6f35d3080a286003462049919cc09b56c751fa..3de88012ca3bb5566002a8f813296c13f194a6e9 100644 (file)
@@ -10,7 +10,7 @@ package org.opendaylight.netconf.nettyutil.handler.ssh.client;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyObject;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -26,7 +26,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelConfig;
@@ -45,8 +44,6 @@ import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
-import org.opendaylight.netconf.shaded.sshd.client.SshClient;
-import org.opendaylight.netconf.shaded.sshd.client.channel.ChannelSubsystem;
 import org.opendaylight.netconf.shaded.sshd.client.channel.ClientChannel;
 import org.opendaylight.netconf.shaded.sshd.client.future.AuthFuture;
 import org.opendaylight.netconf.shaded.sshd.client.future.ConnectFuture;
@@ -66,7 +63,7 @@ import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
 public class AsyncSshHandlerTest {
 
     @Mock
-    private SshClient sshClient;
+    private NetconfSshClient sshClient;
     @Mock
     private AuthenticationHandler authHandler;
     @Mock
@@ -87,7 +84,6 @@ public class AsyncSshHandlerTest {
     private SshFutureListener<ConnectFuture> sshConnectListener;
     private SshFutureListener<AuthFuture> sshAuthListener;
     private SshFutureListener<OpenFuture> sshChannelOpenListener;
-
     private ChannelPromise promise;
 
     @Before
@@ -182,7 +178,7 @@ public class AsyncSshHandlerTest {
 
         final IoInputStream asyncOut = getMockedIoInputStream();
         final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -194,80 +190,7 @@ public class AsyncSshHandlerTest {
 
         verify(promise).setSuccess();
         verify(ctx).fireChannelActive();
-    }
-
-    @Test
-    public void testRead() throws Exception {
-        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
-
-        final IoInputStream asyncOut = getMockedIoInputStream();
-        final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
-        final ClientSession sshSession = getMockedSshSession(subsystemChannel);
-        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
-
-        sshConnectListener.operationComplete(connectFuture);
-        sshAuthListener.operationComplete(getSuccessAuthFuture());
-        sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
-
-        verify(ctx).fireChannelRead(any(ByteBuf.class));
-    }
-
-    @Test
-    public void testReadClosed() throws Exception {
-        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
-
-        final IoInputStream asyncOut = getMockedIoInputStream();
-        final IoReadFuture mockedReadFuture = asyncOut.read(null);
-
-        Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
-            @Override
-            public void onSuccess(final SshFutureListener<IoReadFuture> result) {
-                doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
-                doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(any());
-                doReturn(true).when(asyncOut).isClosing();
-                doReturn(true).when(asyncOut).isClosed();
-                result.operationComplete(mockedReadFuture);
-            }
-        }, MoreExecutors.directExecutor());
-
-        final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
-        final ClientSession sshSession = getMockedSshSession(subsystemChannel);
-        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
-
-        sshConnectListener.operationComplete(connectFuture);
-        sshAuthListener.operationComplete(getSuccessAuthFuture());
-        sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
-
-        verify(ctx).fireChannelInactive();
-    }
-
-    @Test
-    public void testReadFail() throws Exception {
-        asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
-
-        final IoInputStream asyncOut = getMockedIoInputStream();
-        final IoReadFuture mockedReadFuture = asyncOut.read(null);
-
-        Futures.addCallback(stubAddListener(mockedReadFuture), new SuccessFutureListener<IoReadFuture>() {
-            @Override
-            public void onSuccess(final SshFutureListener<IoReadFuture> result) {
-                doReturn(new IllegalStateException()).when(mockedReadFuture).getException();
-                doReturn(mockedReadFuture).when(mockedReadFuture).removeListener(any());
-                result.operationComplete(mockedReadFuture);
-            }
-        }, MoreExecutors.directExecutor());
-
-        final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
-        final ClientSession sshSession = getMockedSshSession(subsystemChannel);
-        final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
-
-        sshConnectListener.operationComplete(connectFuture);
-        sshAuthListener.operationComplete(getSuccessAuthFuture());
-        sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
-
+        asyncSshHandler.close(ctx, getMockedPromise());
         verify(ctx).fireChannelInactive();
     }
 
@@ -277,7 +200,7 @@ public class AsyncSshHandlerTest {
 
         final IoInputStream asyncOut = getMockedIoInputStream();
         final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -311,7 +234,7 @@ public class AsyncSshHandlerTest {
             }
         }, MoreExecutors.directExecutor());
 
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -333,7 +256,7 @@ public class AsyncSshHandlerTest {
         final IoOutputStream asyncIn = getMockedIoOutputStream();
         final IoWriteFuture ioWriteFuture = asyncIn.writePacket(new ByteArrayBuffer());
 
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -382,7 +305,7 @@ public class AsyncSshHandlerTest {
         final IoOutputStream asyncIn = getMockedIoOutputStream();
         final IoWriteFuture ioWriteFuture = asyncIn.writePacket(new ByteArrayBuffer());
 
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -414,7 +337,7 @@ public class AsyncSshHandlerTest {
 
         final IoInputStream asyncOut = getMockedIoInputStream();
         final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -427,7 +350,7 @@ public class AsyncSshHandlerTest {
 
         verify(sshSession).close(anyBoolean());
         verify(disconnectPromise).setSuccess();
-        verify(ctx).fireChannelInactive();
+        //verify(ctx).fireChannelInactive();
     }
 
     private static OpenFuture getSuccessOpenFuture() {
@@ -450,8 +373,9 @@ public class AsyncSshHandlerTest {
         return connectFuture;
     }
 
-    private static ClientSession getMockedSshSession(final ChannelSubsystem subsystemChannel) throws IOException {
-        final ClientSession sshSession = mock(ClientSession.class);
+    private static NettyAwareClientSession getMockedSshSession(final NettyAwareChannelSubsystem subsystemChannel)
+            throws IOException {
+        final NettyAwareClientSession sshSession = mock(NettyAwareClientSession.class);
 
         doReturn("sshSession").when(sshSession).toString();
         doReturn("serverVersion").when(sshSession).getServerVersion();
@@ -467,14 +391,14 @@ public class AsyncSshHandlerTest {
         }, MoreExecutors.directExecutor());
         doReturn(closeFuture).when(sshSession).close(false);
 
-        doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(anyString());
+        doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(eq("netconf"), any());
 
         return sshSession;
     }
 
-    private ChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut,
+    private NettyAwareChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut,
                                                        final IoOutputStream asyncIn) throws IOException {
-        final ChannelSubsystem subsystemChannel = mock(ChannelSubsystem.class);
+        final NettyAwareChannelSubsystem subsystemChannel = mock(NettyAwareChannelSubsystem.class);
         doReturn("subsystemChannel").when(subsystemChannel).toString();
 
         doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
@@ -491,6 +415,8 @@ public class AsyncSshHandlerTest {
 
         doReturn(openFuture).when(subsystemChannel).open();
         doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
+        doNothing().when(subsystemChannel).onClose(any());
+        doNothing().when(subsystemChannel).close();
         return subsystemChannel;
     }
 
@@ -543,7 +469,7 @@ public class AsyncSshHandlerTest {
 
         final IoInputStream asyncOut = getMockedIoInputStream();
         final IoOutputStream asyncIn = getMockedIoOutputStream();
-        final ChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
+        final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -561,7 +487,7 @@ public class AsyncSshHandlerTest {
     public void testConnectFailAuth() throws Exception {
         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
 
-        final ClientSession sshSession = mock(ClientSession.class);
+        final NettyAwareClientSession sshSession = mock(NettyAwareClientSession.class);
         doReturn(true).when(sshSession).isClosed();
         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
 
@@ -571,6 +497,8 @@ public class AsyncSshHandlerTest {
 
         sshAuthListener.operationComplete(authFuture);
         verify(promise).setFailure(any(Throwable.class));
+        asyncSshHandler.close(ctx, getMockedPromise());
+        verify(ctx, times(0)).fireChannelInactive();
     }
 
     private static AuthFuture getFailedAuthFuture() {
index 6df6fb4427686e9c86846553c939530c252f6b89..c14bc0b9f34eb64a0527826559d10ea408f5f044 100644 (file)
@@ -14,14 +14,14 @@ import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.netconf.client.NetconfClientDispatcherImpl;
 import org.opendaylight.netconf.client.NetconfClientSessionNegotiatorFactory;
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
-import org.opendaylight.netconf.shaded.sshd.client.SshClient;
+import org.opendaylight.netconf.nettyutil.handler.ssh.client.NetconfSshClient;
 
 public final class ConfigurableClientDispatcher extends NetconfClientDispatcherImpl {
 
     private final Set<String> capabilities;
 
     private ConfigurableClientDispatcher(final EventLoopGroup bossGroup, final EventLoopGroup workerGroup,
-            final Timer timer, final Set<String> capabilities, @Nullable final SshClient sshClient) {
+            final Timer timer, final Set<String> capabilities, final @Nullable NetconfSshClient sshClient) {
         super(bossGroup, workerGroup, timer, sshClient);
         this.capabilities = capabilities;
     }
@@ -30,7 +30,7 @@ public final class ConfigurableClientDispatcher extends NetconfClientDispatcherI
      * EXI + chunked framing.
      */
     public static ConfigurableClientDispatcher createChunkedExi(final EventLoopGroup bossGroup,
-            final EventLoopGroup workerGroup, final Timer timer, @Nullable final SshClient sshClient) {
+            final EventLoopGroup workerGroup, final Timer timer, final @Nullable NetconfSshClient sshClient) {
         return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer,
             NetconfClientSessionNegotiatorFactory.EXI_CLIENT_CAPABILITIES, sshClient);
     }
@@ -39,7 +39,7 @@ public final class ConfigurableClientDispatcher extends NetconfClientDispatcherI
      * EXI + ]]gt;]]gt; framing.
      */
     public static ConfigurableClientDispatcher createLegacyExi(final EventLoopGroup bossGroup,
-            final EventLoopGroup workerGroup, final Timer timer, @Nullable final SshClient sshClient) {
+            final EventLoopGroup workerGroup, final Timer timer, final @Nullable NetconfSshClient sshClient) {
         return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer,
             NetconfClientSessionNegotiatorFactory.LEGACY_EXI_CLIENT_CAPABILITIES, sshClient);
     }
@@ -48,7 +48,7 @@ public final class ConfigurableClientDispatcher extends NetconfClientDispatcherI
      * Chunked framing.
      */
     public static ConfigurableClientDispatcher createChunked(final EventLoopGroup bossGroup,
-            final EventLoopGroup workerGroup, final Timer timer, @Nullable final SshClient sshClient) {
+            final EventLoopGroup workerGroup, final Timer timer, final @Nullable NetconfSshClient sshClient) {
         return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer,
             NetconfClientSessionNegotiatorFactory.DEFAULT_CLIENT_CAPABILITIES, sshClient);
     }
@@ -57,7 +57,7 @@ public final class ConfigurableClientDispatcher extends NetconfClientDispatcherI
      * ]]gt;]]gt; framing.
      */
     public static ConfigurableClientDispatcher createLegacy(final EventLoopGroup bossGroup,
-            final EventLoopGroup workerGroup, final Timer timer, @Nullable final SshClient sshClient) {
+            final EventLoopGroup workerGroup, final Timer timer, final @Nullable NetconfSshClient sshClient) {
         return new ConfigurableClientDispatcher(bossGroup, workerGroup, timer,
             NetconfClientSessionNegotiatorFactory.LEGACY_FRAMING_CLIENT_CAPABILITIES, sshClient);
     }