Merge "Fix for possible NPE if Bundle is stopped."
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / SshClientAdapter.java
index cf4ec213c296bbd103dfe75f7650670b008aa041..4ca7bdf9580e40ec328d1057a1230a5c7d6eebec 100644 (file)
@@ -8,8 +8,13 @@
 
 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import java.io.IOException;
@@ -18,7 +23,6 @@ import java.io.OutputStream;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket.VirtualSocketException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,7 +31,7 @@ import org.slf4j.LoggerFactory;
  * Worker thread class. Handles all downstream and upstream events in SSH Netty
  * pipeline.
  */
-public class SshClientAdapter implements Runnable {
+class SshClientAdapter implements Runnable {
     private static final Logger logger = LoggerFactory.getLogger(SshClientAdapter.class);
 
     private static final int BUFFER_SIZE = 1024;
@@ -37,7 +41,7 @@ public class SshClientAdapter implements Runnable {
 
     private OutputStream stdIn;
 
-    private Queue<ByteBuf> postponed = new LinkedList<>();
+    private final Queue<ByteBuf> postponed = new LinkedList<>();
 
     private ChannelHandlerContext ctx;
     private ChannelPromise disconnectPromise;
@@ -46,52 +50,36 @@ public class SshClientAdapter implements Runnable {
 
     private final Object lock = new Object();
 
-    public SshClientAdapter(SshClient sshClient, Invoker invoker) {
+    public SshClientAdapter(final SshClient sshClient, final Invoker invoker) {
         this.sshClient = sshClient;
         this.invoker = invoker;
     }
 
+    // TODO ganymed spawns a Thread that receives the data from remote inside TransportManager
+    // Get rid of this thread and reuse Ganymed internal thread (not sure if its possible without modifications in ganymed)
     public void run() {
-        SshSession session;
-        try {
-            session = sshClient.openSession();
-        } catch (IOException e) {
-            logger.error("Cannot establish session", e);
-            sshClient.close();
-            return;
-        }
         try {
+            final SshSession session = sshClient.openSession();
             invoker.invoke(session);
-            InputStream stdOut = session.getStdout();
-            session.getStderr();
+            final InputStream stdOut = session.getStdout();
 
             synchronized (lock) {
-
                 stdIn = session.getStdin();
-                ByteBuf message;
-                while ((message = postponed.poll()) != null) {
-                    writeImpl(message);
+                while (postponed.peek() != null) {
+                    writeImpl(postponed.poll());
                 }
             }
 
             while (!stopRequested.get()) {
-                byte[] readBuff = new byte[BUFFER_SIZE];
-                int c = stdOut.read(readBuff);
+                final byte[] readBuff = new byte[BUFFER_SIZE];
+                final int c = stdOut.read(readBuff);
                 if (c == -1) {
                     continue;
                 }
-                byte[] tranBuff = new byte[c];
-                System.arraycopy(readBuff, 0, tranBuff, 0, c);
 
-                ByteBuf byteBuf = Unpooled.buffer(c);
-                byteBuf.writeBytes(tranBuff);
-                ctx.fireChannelRead(byteBuf);
+                ctx.fireChannelRead(Unpooled.copiedBuffer(readBuff, 0, c));
             }
-
-        } catch (VirtualSocketException e) {
-            // Netty closed connection prematurely.
-            // Just pass and move on.
-        } catch (Exception e) {
+        } catch (final Exception e) {
             logger.error("Unexpected exception", e);
         } finally {
             sshClient.close();
@@ -105,7 +93,7 @@ public class SshClientAdapter implements Runnable {
     }
 
     // TODO: needs rework to match netconf framer API.
-    public void write(ByteBuf message) throws IOException {
+    public void write(final ByteBuf message) throws IOException {
         synchronized (lock) {
             if (stdIn == null) {
                 postponed.add(message);
@@ -115,24 +103,36 @@ public class SshClientAdapter implements Runnable {
         }
     }
 
-    private void writeImpl(ByteBuf message) throws IOException {
+    private void writeImpl(final ByteBuf message) throws IOException {
         message.getBytes(0, stdIn, message.readableBytes());
+        message.release();
         stdIn.flush();
     }
 
-    public void stop(ChannelPromise promise) {
+    public void stop(final ChannelPromise promise) {
         synchronized (lock) {
             stopRequested.set(true);
             disconnectPromise = promise;
         }
     }
 
-    public void start(ChannelHandlerContext ctx) {
-        if (this.ctx != null) {
-            // context is already associated.
-            return;
+    public Thread start(final ChannelHandlerContext ctx, final ChannelFuture channelFuture) {
+        checkArgument(channelFuture.isSuccess());
+        checkNotNull(ctx.channel().remoteAddress());
+        synchronized (this) {
+            checkState(this.ctx == null);
+            this.ctx = ctx;
         }
-        this.ctx = ctx;
-        new Thread(this).start();
+        final String threadName = toString();
+        final Thread thread = new Thread(this, threadName);
+        thread.start();
+        return thread;
+    }
+
+    @Override
+    public String toString() {
+        return "SshClientAdapter{" +
+                "sshClient=" + sshClient +
+                '}';
     }
 }