Merge "Bug 809: Enhancements to the toaster example"
[controller.git] / opendaylight / netconf / netconf-util / src / main / java / org / opendaylight / controller / netconf / util / handler / ssh / client / SshClientAdapter.java
index a50462e40dae1e3c5ce40afd451713992ad28e2a..244bcc0041c963988aa8fc78c712767fc7c643c2 100644 (file)
@@ -12,24 +12,30 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.opendaylight.controller.netconf.util.handler.ssh.virtualsocket.VirtualSocketException;
 
+
 /**
- * Worker thread class. Handles all downstream and upstream events in SSH Netty pipeline.
+ * Worker thread class. Handles all downstream and upstream events in SSH Netty
+ * pipeline.
  */
 public class SshClientAdapter implements Runnable {
+    private static final int BUFFER_SIZE = 1024;
+
     private final SshClient sshClient;
     private final Invoker invoker;
 
-    private SshSession session;
-    private InputStream stdOut;
-    private InputStream stdErr;
     private OutputStream stdIn;
 
+    private Queue<ByteBuf> postponed = new LinkedList<>();
+
     private ChannelHandlerContext ctx;
     private ChannelPromise disconnectPromise;
 
@@ -37,28 +43,33 @@ public class SshClientAdapter implements Runnable {
 
     private final Object lock = new Object();
 
-    public SshClientAdapter(SshClient sshClient,
-                            Invoker invoker) {
+    public SshClientAdapter(SshClient sshClient, Invoker invoker) {
         this.sshClient = sshClient;
         this.invoker = invoker;
     }
 
     public void run() {
         try {
-            session = sshClient.openSession();
+            SshSession session = sshClient.openSession();
             invoker.invoke(session);
+            InputStream stdOut = session.getStdout();
+            session.getStderr();
 
-            stdOut = session.getStdout();
-            stdErr = session.getStderr();
+            synchronized (lock) {
 
-            synchronized(lock) {
                 stdIn = session.getStdin();
+                ByteBuf message;
+                while ((message = postponed.poll()) != null) {
+                    writeImpl(message);
+                }
             }
 
-            while (stopRequested.get() == false) {
-                byte[] readBuff = new byte[1024];
+            while (!stopRequested.get()) {
+                byte[] readBuff = new byte[BUFFER_SIZE];
                 int c = stdOut.read(readBuff);
-
+                if (c == -1) {
+                    continue;
+                }
                 byte[] tranBuff = new byte[c];
                 System.arraycopy(readBuff, 0, tranBuff, 0, c);
 
@@ -71,22 +82,31 @@ public class SshClientAdapter implements Runnable {
             // Netty closed connection prematurely.
             // Just pass and move on.
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new IllegalStateException(e);
         } finally {
             sshClient.close();
 
             synchronized (lock) {
-                if(disconnectPromise != null) ctx.disconnect(disconnectPromise);
+                if (disconnectPromise != null) {
+                    ctx.disconnect(disconnectPromise);
+                }
             }
         }
     }
 
     // TODO: needs rework to match netconf framer API.
-    public void write(String message) throws IOException {
+    public void write(ByteBuf message) throws IOException {
         synchronized (lock) {
-            if (stdIn == null) throw new IllegalStateException("StdIn not available");
+            if (stdIn == null) {
+                postponed.add(message);
+                return;
+            }
+            writeImpl(message);
         }
-        stdIn.write(message.getBytes());
+    }
+
+    private void writeImpl(ByteBuf message) throws IOException {
+        message.getBytes(0, stdIn, message.readableBytes());
         stdIn.flush();
     }
 
@@ -98,8 +118,10 @@ public class SshClientAdapter implements Runnable {
     }
 
     public void start(ChannelHandlerContext ctx) {
-        if(this.ctx != null) return; // context is already associated.
-
+        if (this.ctx != null) {
+            // context is already associated.
+            return;
+        }
         this.ctx = ctx;
         new Thread(this).start();
     }