Fixed handshake issues with SSH Netconf client
[controller.git] / opendaylight / netconf / netconf-util / src / main / java / org / opendaylight / controller / netconf / util / handler / ssh / client / SshClientAdapter.java
index a50462e40dae1e3c5ce40afd451713992ad28e2a..4213fe3e0642db6257b3e655c407a4394785fca3 100644 (file)
@@ -12,14 +12,19 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.opendaylight.controller.netconf.util.handler.ssh.virtualsocket.VirtualSocketException;
 
+
 /**
- * Worker thread class. Handles all downstream and upstream events in SSH Netty pipeline.
+ * Worker thread class. Handles all downstream and upstream events in SSH Netty
+ * pipeline.
  */
 public class SshClientAdapter implements Runnable {
     private final SshClient sshClient;
@@ -30,6 +35,9 @@ public class SshClientAdapter implements Runnable {
     private InputStream stdErr;
     private OutputStream stdIn;
 
+    private Queue<ByteBuf> postponned = new LinkedList<>();
+
+
     private ChannelHandlerContext ctx;
     private ChannelPromise disconnectPromise;
 
@@ -37,8 +45,7 @@ public class SshClientAdapter implements Runnable {
 
     private final Object lock = new Object();
 
-    public SshClientAdapter(SshClient sshClient,
-                            Invoker invoker) {
+    public SshClientAdapter(SshClient sshClient, Invoker invoker) {
         this.sshClient = sshClient;
         this.invoker = invoker;
     }
@@ -47,18 +54,24 @@ public class SshClientAdapter implements Runnable {
         try {
             session = sshClient.openSession();
             invoker.invoke(session);
-
             stdOut = session.getStdout();
             stdErr = session.getStderr();
 
-            synchronized(lock) {
+            synchronized (lock) {
+
                 stdIn = session.getStdin();
+                ByteBuf message = null;
+                while ((message = postponned.poll()) != null) {
+                    writeImpl(message);
+                }
             }
 
             while (stopRequested.get() == false) {
                 byte[] readBuff = new byte[1024];
                 int c = stdOut.read(readBuff);
-
+                if (c == -1) {
+                    continue;
+                }
                 byte[] tranBuff = new byte[c];
                 System.arraycopy(readBuff, 0, tranBuff, 0, c);
 
@@ -76,17 +89,25 @@ public class SshClientAdapter implements Runnable {
             sshClient.close();
 
             synchronized (lock) {
-                if(disconnectPromise != null) ctx.disconnect(disconnectPromise);
+                if (disconnectPromise != null)
+                    ctx.disconnect(disconnectPromise);
             }
         }
     }
 
     // TODO: needs rework to match netconf framer API.
-    public void write(String message) throws IOException {
+    public void write(ByteBuf message) throws IOException {
         synchronized (lock) {
-            if (stdIn == null) throw new IllegalStateException("StdIn not available");
+            if (stdIn == null) {
+                postponned.add(message);
+                return;
+            }
+            writeImpl(message);
         }
-        stdIn.write(message.getBytes());
+    }
+
+    private void writeImpl(ByteBuf message) throws IOException {
+        message.getBytes(0, stdIn, message.readableBytes());
         stdIn.flush();
     }
 
@@ -98,8 +119,8 @@ public class SshClientAdapter implements Runnable {
     }
 
     public void start(ChannelHandlerContext ctx) {
-        if(this.ctx != null) return; // context is already associated.
-
+        if (this.ctx != null)
+            return; // context is already associated.
         this.ctx = ctx;
         new Thread(this).start();
     }