BUG-1612 Implement mina ssh netconf server endpoint
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerReader.java
@@ -8,9 +8,8 @@
 
 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandler;
 import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoInputStream;
 import org.apache.sshd.common.io.IoReadFuture;
@@ -22,22 +21,24 @@ import org.slf4j.LoggerFactory;
  * Listener on async input stream from SSH session.
  * This listeners schedules reads in a loop until the session is closed or read fails.
  */
-final class AsyncSshHanderReader implements SshFutureListener<IoReadFuture>, AutoCloseable {
+public final class AsyncSshHandlerReader implements SshFutureListener<IoReadFuture>, AutoCloseable {
 
     private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class);
 
     private static final int BUFFER_SIZE = 8192;
 
-    private final ChannelOutboundHandler asyncSshHandler;
-    private final ChannelHandlerContext ctx;
+    private final AutoCloseable connectionClosedCallback;
+    private final ReadMsgHandler readHandler;
 
+    private final String channelId;
     private IoInputStream asyncOut;
     private Buffer buf;
     private IoReadFuture currentReadFuture;
 
-    public AsyncSshHanderReader(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
-        this.asyncSshHandler = asyncSshHandler;
-        this.ctx = ctx;
+    public AsyncSshHandlerReader(final AutoCloseable connectionClosedCallback, final ReadMsgHandler readHandler, final String channelId, final IoInputStream asyncOut) {
+        this.connectionClosedCallback = connectionClosedCallback;
+        this.readHandler = readHandler;
+        this.channelId = channelId;
         this.asyncOut = asyncOut;
         buf = new Buffer(BUFFER_SIZE);
         asyncOut.read(buf).addListener(this);
@@ -48,16 +49,20 @@ final class AsyncSshHanderReader implements SshFutureListener<IoReadFuture>, Aut
         if(future.getException() != null) {
             if(asyncOut.isClosed() || asyncOut.isClosing()) {
                 // Ssh dropped
-                logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
+                logger.debug("Ssh session dropped on channel: {}", channelId, future.getException());
             } else {
-                logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
+                logger.warn("Exception while reading from SSH remote on channel {}", channelId, future.getException());
             }
             invokeDisconnect();
             return;
         }
 
         if (future.getRead() > 0) {
-            ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead()));
+            final ByteBuf msg = Unpooled.wrappedBuffer(buf.array(), 0, future.getRead());
+            if(logger.isTraceEnabled()) {
+                logger.trace("Reading message on channel: {}, message: {}", channelId, AsyncSshHandlerWriter.byteBufToString(msg));
+            }
+            readHandler.onMessageRead(msg);
 
             // Schedule next read
             buf = new Buffer(BUFFER_SIZE);
@@ -68,7 +73,7 @@ final class AsyncSshHanderReader implements SshFutureListener<IoReadFuture>, Aut
 
     private void invokeDisconnect() {
         try {
-            asyncSshHandler.disconnect(ctx, ctx.newPromise());
+            connectionClosedCallback.close();
         } catch (final Exception e) {
             // This should not happen
             throw new IllegalStateException(e);
@@ -80,8 +85,14 @@ final class AsyncSshHanderReader implements SshFutureListener<IoReadFuture>, Aut
         // Remove self as listener on close to prevent reading from closed input
         if(currentReadFuture != null) {
             currentReadFuture.removeListener(this);
+            currentReadFuture = null;
         }
 
         asyncOut = null;
     }
+
+    public interface ReadMsgHandler {
+
+        void onMessageRead(ByteBuf msg);
+    }
 }