X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-ssh%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fssh%2Fthreads%2FHandshaker.java;h=eec6c3a0971d8a3369ee05a863540bed4b85f8b2;hp=d999d378d9af12c56298f292b9b0723b1d9bbe38;hb=818258c2370c687de93edc887b32019d25c34095;hpb=022c6b13e92d53db58fbcb3d754e3164357030b8 diff --git a/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/threads/Handshaker.java b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/threads/Handshaker.java index d999d378d9..eec6c3a097 100644 --- a/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/threads/Handshaker.java +++ b/opendaylight/netconf/netconf-ssh/src/main/java/org/opendaylight/controller/netconf/ssh/threads/Handshaker.java @@ -10,6 +10,20 @@ package org.opendaylight.controller.netconf.ssh.threads; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; + +import javax.annotation.concurrent.NotThreadSafe; +import javax.annotation.concurrent.ThreadSafe; + +import org.opendaylight.controller.netconf.auth.AuthProvider; +import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import ch.ethz.ssh2.AuthenticationResult; import ch.ethz.ssh2.PtySettings; import ch.ethz.ssh2.ServerAuthenticationCallback; @@ -18,7 +32,9 @@ import ch.ethz.ssh2.ServerConnectionCallback; import ch.ethz.ssh2.ServerSession; import ch.ethz.ssh2.ServerSessionCallback; import ch.ethz.ssh2.SimpleServerSessionCallback; + import com.google.common.base.Supplier; + import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufProcessor; @@ -32,16 +48,6 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; import io.netty.handler.stream.ChunkedStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import javax.annotation.concurrent.NotThreadSafe; -import javax.annotation.concurrent.ThreadSafe; -import org.opendaylight.controller.netconf.ssh.authentication.AuthProvider; -import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * One instance represents per connection, responsible for ssh handshake. @@ -57,7 +63,7 @@ public class Handshaker implements Runnable { public Handshaker(Socket socket, LocalAddress localAddress, long sessionId, AuthProvider authProvider, - EventLoopGroup bossGroup) throws IOException { + EventLoopGroup bossGroup, final char[] pem) throws IOException { this.session = "Session " + sessionId; @@ -82,7 +88,7 @@ public class Handshaker implements Runnable { getGanymedAutoCloseable(ganymedConnection), localAddress, bossGroup); // initialize ganymed - ganymedConnection.setPEMHostKey(authProvider.getPEMAsCharArray(), null); + ganymedConnection.setPEMHostKey(pem, null); ganymedConnection.setAuthenticationCallback(serverAuthenticationCallback); ganymedConnection.setServerConnectionCallback(serverConnectionCallback); } @@ -100,14 +106,14 @@ public class Handshaker implements Runnable { @Override public void run() { // let ganymed process handshake - logger.trace("{} SocketThread is started", session); + logger.trace("{} is started", session); try { // TODO this should be guarded with a timer to prevent resource exhaustion ganymedConnection.connect(); } catch (IOException e) { - logger.warn("{} SocketThread error ", session, e); + logger.debug("{} connection error", session, e); } - logger.trace("{} SocketThread is exiting", session); + logger.trace("{} is exiting", session); } } @@ -119,14 +125,14 @@ public class Handshaker implements Runnable { class SSHClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(SSHClientHandler.class); private final AutoCloseable remoteConnection; - private final OutputStream remoteOutputStream; + private final BufferedOutputStream remoteOutputStream; private final String session; private ChannelHandlerContext channelHandlerContext; public SSHClientHandler(AutoCloseable remoteConnection, OutputStream remoteOutputStream, String session) { this.remoteConnection = remoteConnection; - this.remoteOutputStream = remoteOutputStream; + this.remoteOutputStream = new BufferedOutputStream(remoteOutputStream); this.session = session; } @@ -137,7 +143,7 @@ class SSHClientHandler extends ChannelInboundHandlerAdapter { } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { + public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException { ByteBuf bb = (ByteBuf) msg; // we can block the server here so that slow client does not cause memory pressure try { @@ -242,6 +248,12 @@ class ServerConnectionCallbackImpl implements ServerConnectionCallback { ChannelFuture clientChannelFuture = initializeNettyConnection(localAddress, bossGroup, sshClientHandler); // get channel final Channel channel = clientChannelFuture.awaitUninterruptibly().channel(); + + // write additional header before polling thread is started + // polling thread could process and forward data before additional header is written + // This will result into unexpected state: hello message without additional header and the next message with additional header + channel.writeAndFlush(Unpooled.copiedBuffer(additionalHeader.getBytes())); + new ClientInputStreamPoolingThread(session, ss.getStdout(), channel, new AutoCloseable() { @Override public void close() throws Exception { @@ -258,9 +270,6 @@ class ServerConnectionCallbackImpl implements ServerConnectionCallback { } } }, sshClientHandler.getChannelHandlerContext()).start(); - - // write additional header - channel.writeAndFlush(Unpooled.copiedBuffer(additionalHeader.getBytes())); } else { logger.debug("{} Wrong subsystem requested:'{}', closing ssh session", serverSession, subsystem); String reason = "Only netconf subsystem is supported, requested:" + subsystem;