BUG-1842 Fix byte buffer handling for pending messages
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandler.java
index 2761a45d03bedc8730cb69036748c266cdc7f412..3d1e4784f2ded5677472e3a25d99adaa11e24cf9 100644 (file)
@@ -12,6 +12,7 @@ import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandler;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
 import java.io.IOException;
@@ -25,7 +26,10 @@ import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.io.IoReadFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.WritePendingException;
 import org.apache.sshd.common.util.Buffer;
 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
 import org.slf4j.Logger;
@@ -53,10 +57,13 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     private final SshClient sshClient;
 
     private SshReadAsyncListener sshReadAsyncListener;
+    private SshWriteAsyncHandler sshWriteAsyncHandler;
+
     private ClientChannel channel;
     private ClientSession session;
     private ChannelPromise connectPromise;
 
+
     public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler) throws IOException {
         return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT);
     }
@@ -139,10 +146,13 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
         connectPromise.setSuccess();
         connectPromise = null;
-        ctx.fireChannelActive();
 
-        final IoInputStream asyncOut = channel.getAsyncOut();
-        sshReadAsyncListener = new SshReadAsyncListener(ctx, asyncOut);
+        sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
+        // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
+        if(channel != null) {
+            sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
+            ctx.fireChannelActive();
+        }
     }
 
     private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
@@ -154,31 +164,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
     @Override
     public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
-        try {
-            if(channel.getAsyncIn().isClosed() || channel.getAsyncIn().isClosing()) {
-                handleSshSessionClosed(ctx);
-            } else {
-                channel.getAsyncIn().write(toBuffer(msg));
-                ((ByteBuf) msg).release();
-            }
-        } catch (final Exception e) {
-            logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e);
-            throw new IllegalStateException("Exception while writing to SSH remote on channel " + ctx.channel(),e);
-        }
-    }
-
-    private static void handleSshSessionClosed(final ChannelHandlerContext ctx) {
-        logger.debug("SSH session closed on channel: {}", ctx.channel());
-        ctx.fireChannelInactive();
-    }
-
-    private Buffer toBuffer(final Object msg) {
-        // TODO Buffer vs ByteBuf translate, Can we handle that better ?
-        Preconditions.checkState(msg instanceof ByteBuf);
-        final ByteBuf byteBuf = (ByteBuf) msg;
-        final byte[] temp = new byte[byteBuf.readableBytes()];
-        byteBuf.readBytes(temp, 0, byteBuf.readableBytes());
-        return new Buffer(temp);
+        sshWriteAsyncHandler.write(ctx, msg, promise);
     }
 
     @Override
@@ -193,22 +179,32 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     }
 
     @Override
-    public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
+    public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
         if(sshReadAsyncListener != null) {
             sshReadAsyncListener.close();
         }
 
-        session.close(false).addListener(new SshFutureListener<CloseFuture>() {
-            @Override
-            public void operationComplete(final CloseFuture future) {
-                if(future.isClosed() == false) {
-                    session.close(true);
+        if(sshWriteAsyncHandler != null) {
+            sshWriteAsyncHandler.close();
+        }
+
+        if(session!= null && !session.isClosed() && !session.isClosing()) {
+            session.close(false).addListener(new SshFutureListener<CloseFuture>() {
+                @Override
+                public void operationComplete(final CloseFuture future) {
+                    if (future.isClosed() == false) {
+                        session.close(true);
+                    }
+                    session = null;
                 }
-                session = null;
-            }
-        });
+            });
+        }
 
         channel = null;
+        promise.setSuccess();
+
+        logger.debug("SSH session closed on channel: {}", ctx.channel());
+        ctx.fireChannelInactive();
     }
 
     /**
@@ -218,13 +214,15 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     private static class SshReadAsyncListener implements SshFutureListener<IoReadFuture>, AutoCloseable {
         private static final int BUFFER_SIZE = 8192;
 
+        private final ChannelOutboundHandler asyncSshHandler;
         private final ChannelHandlerContext ctx;
 
         private IoInputStream asyncOut;
         private Buffer buf;
         private IoReadFuture currentReadFuture;
 
-        public SshReadAsyncListener(final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+        public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+            this.asyncSshHandler = asyncSshHandler;
             this.ctx = ctx;
             this.asyncOut = asyncOut;
             buf = new Buffer(BUFFER_SIZE);
@@ -234,14 +232,14 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         @Override
         public synchronized void operationComplete(final IoReadFuture future) {
             if(future.getException() != null) {
-
                 if(asyncOut.isClosed() || asyncOut.isClosing()) {
-                    // We are closing
-                    handleSshSessionClosed(ctx);
+                    // Ssh dropped
+                    logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
                 } else {
                     logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
-                    throw new IllegalStateException("Exception while reading from SSH remote on channel " + ctx.channel(), future.getException());
                 }
+                invokeDisconnect();
+                return;
             }
 
             if (future.getRead() > 0) {
@@ -254,8 +252,17 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             }
         }
 
+        private void invokeDisconnect() {
+            try {
+                asyncSshHandler.disconnect(ctx, ctx.newPromise());
+            } catch (final Exception e) {
+                // This should not happen
+                throw new IllegalStateException(e);
+            }
+        }
+
         @Override
-        public synchronized void close() throws Exception {
+        public synchronized void close() {
             // Remove self as listener on close to prevent reading from closed input
             if(currentReadFuture != null) {
                 currentReadFuture.removeListener(this);
@@ -264,4 +271,114 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             asyncOut = null;
         }
     }
+
+    private static final class SshWriteAsyncHandler implements AutoCloseable {
+        public static final int MAX_PENDING_WRITES = 100;
+
+        private final ChannelOutboundHandler channelHandler;
+        private IoOutputStream asyncIn;
+
+        // Counter that holds the amount of pending write messages
+        // Pending write can occur in case remote window is full
+        // In such case, we need to wait for the pending write to finish
+        private int pendingWriteCounter;
+        // Last write future, that can be pending
+        private IoWriteFuture lastWriteFuture;
+
+        public SshWriteAsyncHandler(final ChannelOutboundHandler channelHandler, final IoOutputStream asyncIn) {
+            this.channelHandler = channelHandler;
+            this.asyncIn = asyncIn;
+        }
+
+        int c = 0;
+
+        public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
+            try {
+                if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
+                    // If we are closed/closing, set immediate fail
+                    promise.setFailure(new IllegalStateException("Channel closed"));
+                } else {
+                    lastWriteFuture = asyncIn.write(toBuffer(msg));
+                    lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
+
+                        @Override
+                        public void operationComplete(final IoWriteFuture future) {
+                            ((ByteBuf) msg).release();
+
+                            // Notify success or failure
+                            if (future.isWritten()) {
+                                promise.setSuccess();
+                            } else {
+                                promise.setFailure(future.getException());
+                            }
+
+                            // Reset last pending future
+                            synchronized (SshWriteAsyncHandler.this) {
+                                lastWriteFuture = null;
+                            }
+                        }
+                    });
+                }
+            } catch (final WritePendingException e) {
+                // Check limit for pending writes
+                pendingWriteCounter++;
+                if(pendingWriteCounter > MAX_PENDING_WRITES) {
+                    promise.setFailure(e);
+                    handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
+                            ", remote window is not getting read or is too small"));
+                }
+
+                // We need to reset buffer read index, since we've already read it when we tried to write it the first time
+                ((ByteBuf) msg).resetReaderIndex();
+                logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
+
+                // In case of pending, re-invoke write after pending is finished
+                Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e);
+                lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
+                    @Override
+                    public void operationComplete(final IoWriteFuture future) {
+                        // FIXME possible minor race condition, we cannot guarantee that this callback when pending is finished will be executed first
+                        // External thread could trigger write on this instance while we are on this line
+                        // Verify
+                        if (future.isWritten()) {
+                            synchronized (SshWriteAsyncHandler.this) {
+                                // Pending done, decrease counter
+                                pendingWriteCounter--;
+                                write(ctx, msg, promise);
+                            }
+                        } else {
+                            // Cannot reschedule pending, fail
+                            handlePendingFailed(ctx, e);
+                        }
+                    }
+
+                });
+            }
+        }
+
+        private void handlePendingFailed(final ChannelHandlerContext ctx, final Exception e) {
+            logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e);
+            try {
+                channelHandler.disconnect(ctx, ctx.newPromise());
+            } catch (final Exception ex) {
+                // This should not happen
+                throw new IllegalStateException(ex);
+            }
+        }
+
+        @Override
+        public void close() {
+            asyncIn = null;
+        }
+
+        private Buffer toBuffer(final Object msg) {
+            // TODO Buffer vs ByteBuf translate, Can we handle that better ?
+            Preconditions.checkState(msg instanceof ByteBuf);
+            final ByteBuf byteBuf = (ByteBuf) msg;
+            final byte[] temp = new byte[byteBuf.readableBytes()];
+            byteBuf.readBytes(temp, 0, byteBuf.readableBytes());
+            return new Buffer(temp);
+        }
+
+    }
 }