BUG 2221 : Add metering to ShardTransaction actor
[controller.git] / opendaylight / netconf / netconf-ssh / src / main / java / org / opendaylight / controller / netconf / ssh / threads / Handshaker.java
index d999d378d9af12c56298f292b9b0723b1d9bbe38..eec6c3a0971d8a3369ee05a863540bed4b85f8b2 100644 (file)
@@ -10,6 +10,20 @@ package org.opendaylight.controller.netconf.ssh.threads;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.opendaylight.controller.netconf.auth.AuthProvider;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import ch.ethz.ssh2.AuthenticationResult;
 import ch.ethz.ssh2.PtySettings;
 import ch.ethz.ssh2.ServerAuthenticationCallback;
@@ -18,7 +32,9 @@ import ch.ethz.ssh2.ServerConnectionCallback;
 import ch.ethz.ssh2.ServerSession;
 import ch.ethz.ssh2.ServerSessionCallback;
 import ch.ethz.ssh2.SimpleServerSessionCallback;
+
 import com.google.common.base.Supplier;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufProcessor;
@@ -32,16 +48,6 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.local.LocalAddress;
 import io.netty.channel.local.LocalChannel;
 import io.netty.handler.stream.ChunkedStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import javax.annotation.concurrent.NotThreadSafe;
-import javax.annotation.concurrent.ThreadSafe;
-import org.opendaylight.controller.netconf.ssh.authentication.AuthProvider;
-import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * One instance represents per connection, responsible for ssh handshake.
@@ -57,7 +63,7 @@ public class Handshaker implements Runnable {
 
 
     public Handshaker(Socket socket, LocalAddress localAddress, long sessionId, AuthProvider authProvider,
-                      EventLoopGroup bossGroup) throws IOException {
+                      EventLoopGroup bossGroup, final char[] pem) throws IOException {
 
         this.session = "Session " + sessionId;
 
@@ -82,7 +88,7 @@ public class Handshaker implements Runnable {
                 getGanymedAutoCloseable(ganymedConnection), localAddress, bossGroup);
 
         // initialize ganymed
-        ganymedConnection.setPEMHostKey(authProvider.getPEMAsCharArray(), null);
+        ganymedConnection.setPEMHostKey(pem, null);
         ganymedConnection.setAuthenticationCallback(serverAuthenticationCallback);
         ganymedConnection.setServerConnectionCallback(serverConnectionCallback);
     }
@@ -100,14 +106,14 @@ public class Handshaker implements Runnable {
     @Override
     public void run() {
         // let ganymed process handshake
-        logger.trace("{} SocketThread is started", session);
+        logger.trace("{} is started", session);
         try {
             // TODO this should be guarded with a timer to prevent resource exhaustion
             ganymedConnection.connect();
         } catch (IOException e) {
-            logger.warn("{} SocketThread error ", session, e);
+            logger.debug("{} connection error", session, e);
         }
-        logger.trace("{} SocketThread is exiting", session);
+        logger.trace("{} is exiting", session);
     }
 }
 
@@ -119,14 +125,14 @@ public class Handshaker implements Runnable {
 class SSHClientHandler extends ChannelInboundHandlerAdapter {
     private static final Logger logger = LoggerFactory.getLogger(SSHClientHandler.class);
     private final AutoCloseable remoteConnection;
-    private final OutputStream remoteOutputStream;
+    private final BufferedOutputStream remoteOutputStream;
     private final String session;
     private ChannelHandlerContext channelHandlerContext;
 
     public SSHClientHandler(AutoCloseable remoteConnection, OutputStream remoteOutputStream,
                             String session) {
         this.remoteConnection = remoteConnection;
-        this.remoteOutputStream = remoteOutputStream;
+        this.remoteOutputStream = new BufferedOutputStream(remoteOutputStream);
         this.session = session;
     }
 
@@ -137,7 +143,7 @@ class SSHClientHandler extends ChannelInboundHandlerAdapter {
     }
 
     @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) {
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
         ByteBuf bb = (ByteBuf) msg;
         // we can block the server here so that slow client does not cause memory pressure
         try {
@@ -242,6 +248,12 @@ class ServerConnectionCallbackImpl implements ServerConnectionCallback {
                             ChannelFuture clientChannelFuture = initializeNettyConnection(localAddress, bossGroup, sshClientHandler);
                             // get channel
                             final Channel channel = clientChannelFuture.awaitUninterruptibly().channel();
+
+                            // write additional header before polling thread is started
+                            // polling thread could process and forward data before additional header is written
+                            // This will result into unexpected state:  hello message without additional header and the next message with additional header
+                            channel.writeAndFlush(Unpooled.copiedBuffer(additionalHeader.getBytes()));
+
                             new ClientInputStreamPoolingThread(session, ss.getStdout(), channel, new AutoCloseable() {
                                 @Override
                                 public void close() throws Exception {
@@ -258,9 +270,6 @@ class ServerConnectionCallbackImpl implements ServerConnectionCallback {
                                     }
                                 }
                             }, sshClientHandler.getChannelHandlerContext()).start();
-
-                            // write additional header
-                            channel.writeAndFlush(Unpooled.copiedBuffer(additionalHeader.getBytes()));
                         } else {
                             logger.debug("{} Wrong subsystem requested:'{}', closing ssh session", serverSession, subsystem);
                             String reason = "Only netconf subsystem is supported, requested:" + subsystem;