BUG-1568 Getting rid of StreamGobller from netconf ssh client. 03/10103/2
authorMaros Marsalek <mmarsale@cisco.com>
Wed, 20 Aug 2014 13:11:35 +0000 (15:11 +0200)
committerMaros Marsalek <mmarsale@cisco.com>
Thu, 21 Aug 2014 07:31:17 +0000 (07:31 +0000)
We have another thread in SshClientAdapter that works as StreamGobller
http://www.ganymed.ethz.ch/ssh2/FAQ.html#blocking

Change-Id: I3f61b14cb22396d8b5898028bd83f62d98c6866d
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshClientAdapter.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshSession.java

index 1a2eb3f1ab43d188179351341264c0e46bc52350..4ca7bdf9580e40ec328d1057a1230a5c7d6eebec 100644 (file)
@@ -41,7 +41,7 @@ class SshClientAdapter implements Runnable {
 
     private OutputStream stdIn;
 
-    private Queue<ByteBuf> postponed = new LinkedList<>();
+    private final Queue<ByteBuf> postponed = new LinkedList<>();
 
     private ChannelHandlerContext ctx;
     private ChannelPromise disconnectPromise;
@@ -50,42 +50,36 @@ class SshClientAdapter implements Runnable {
 
     private final Object lock = new Object();
 
-    public SshClientAdapter(SshClient sshClient, Invoker invoker) {
+    public SshClientAdapter(final SshClient sshClient, final Invoker invoker) {
         this.sshClient = sshClient;
         this.invoker = invoker;
     }
 
-    // TODO: refactor
+    // TODO ganymed spawns a Thread that receives the data from remote inside TransportManager
+    // Get rid of this thread and reuse Ganymed internal thread (not sure if its possible without modifications in ganymed)
     public void run() {
         try {
-            SshSession session = sshClient.openSession();
+            final SshSession session = sshClient.openSession();
             invoker.invoke(session);
-            InputStream stdOut = session.getStdout();
-            session.getStderr();
+            final InputStream stdOut = session.getStdout();
 
             synchronized (lock) {
-
                 stdIn = session.getStdin();
-                ByteBuf message;
-                while ((message = postponed.poll()) != null) {
-                    writeImpl(message);
+                while (postponed.peek() != null) {
+                    writeImpl(postponed.poll());
                 }
             }
 
             while (!stopRequested.get()) {
-                byte[] readBuff = new byte[BUFFER_SIZE];
-                int c = stdOut.read(readBuff);
+                final byte[] readBuff = new byte[BUFFER_SIZE];
+                final int c = stdOut.read(readBuff);
                 if (c == -1) {
                     continue;
                 }
-                byte[] tranBuff = new byte[c];
-                System.arraycopy(readBuff, 0, tranBuff, 0, c);
 
-                ByteBuf byteBuf = Unpooled.buffer(c);
-                byteBuf.writeBytes(tranBuff);
-                ctx.fireChannelRead(byteBuf);
+                ctx.fireChannelRead(Unpooled.copiedBuffer(readBuff, 0, c));
             }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             logger.error("Unexpected exception", e);
         } finally {
             sshClient.close();
@@ -99,7 +93,7 @@ class SshClientAdapter implements Runnable {
     }
 
     // TODO: needs rework to match netconf framer API.
-    public void write(ByteBuf message) throws IOException {
+    public void write(final ByteBuf message) throws IOException {
         synchronized (lock) {
             if (stdIn == null) {
                 postponed.add(message);
@@ -109,28 +103,28 @@ class SshClientAdapter implements Runnable {
         }
     }
 
-    private void writeImpl(ByteBuf message) throws IOException {
+    private void writeImpl(final ByteBuf message) throws IOException {
         message.getBytes(0, stdIn, message.readableBytes());
         message.release();
         stdIn.flush();
     }
 
-    public void stop(ChannelPromise promise) {
+    public void stop(final ChannelPromise promise) {
         synchronized (lock) {
             stopRequested.set(true);
             disconnectPromise = promise;
         }
     }
 
-    public Thread start(ChannelHandlerContext ctx, ChannelFuture channelFuture) {
+    public Thread start(final ChannelHandlerContext ctx, final ChannelFuture channelFuture) {
         checkArgument(channelFuture.isSuccess());
         checkNotNull(ctx.channel().remoteAddress());
         synchronized (this) {
             checkState(this.ctx == null);
             this.ctx = ctx;
         }
-        String threadName = toString();
-        Thread thread = new Thread(this, threadName);
+        final String threadName = toString();
+        final Thread thread = new Thread(this, threadName);
         thread.start();
         return thread;
     }
index 44893b879431fe738e40e91ea26c88c7b9c6009d..9cdc5926f042fcdc30129257597113c532581e9e 100644 (file)
@@ -9,8 +9,6 @@
 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
 
 import ch.ethz.ssh2.Session;
-import ch.ethz.ssh2.StreamGobbler;
-
 import ch.ethz.ssh2.channel.Channel;
 import java.io.Closeable;
 import java.io.IOException;
@@ -23,19 +21,20 @@ import java.io.OutputStream;
 class SshSession implements Closeable {
     private final Session session;
 
-    public SshSession(Session session) {
+    public SshSession(final Session session) {
         this.session = session;
     }
 
-
-    public void startSubSystem(String name) throws IOException {
+    public void startSubSystem(final String name) throws IOException {
         session.startSubSystem(name);
     }
 
     public InputStream getStdout() {
-        return new StreamGobbler(session.getStdout());
+        return session.getStdout();
     }
 
+    // FIXME according to http://www.ganymed.ethz.ch/ssh2/FAQ.html#blocking you should read data from both stdout and stderr to prevent window filling up (since stdout and stderr share a window)
+    // FIXME stdErr is not used anywhere
     public InputStream getStderr() {
         return session.getStderr();
     }