BUG-1842 Fix byte buffer handling for pending messages
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandler.java
index 935cb8dcd06ca966e6c560560d2030150cced460..3d1e4784f2ded5677472e3a25d99adaa11e24cf9 100644 (file)
@@ -147,10 +147,12 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         connectPromise.setSuccess();
         connectPromise = null;
 
-        sshReadAsyncListener = new SshReadAsyncListener(ctx, channel.getAsyncOut());
-        sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
-
-        ctx.fireChannelActive();
+        sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
+        // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
+        if(channel != null) {
+            sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
+            ctx.fireChannelActive();
+        }
     }
 
     private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
@@ -165,11 +167,6 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         sshWriteAsyncHandler.write(ctx, msg, promise);
     }
 
-    private static void handleSshSessionClosed(final ChannelHandlerContext ctx) {
-        logger.debug("SSH session closed on channel: {}", ctx.channel());
-        ctx.fireChannelInactive();
-    }
-
     @Override
     public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
         this.connectPromise = promise;
@@ -206,7 +203,8 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         channel = null;
         promise.setSuccess();
 
-        handleSshSessionClosed(ctx);
+        logger.debug("SSH session closed on channel: {}", ctx.channel());
+        ctx.fireChannelInactive();
     }
 
     /**
@@ -216,13 +214,15 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     private static class SshReadAsyncListener implements SshFutureListener<IoReadFuture>, AutoCloseable {
         private static final int BUFFER_SIZE = 8192;
 
+        private final ChannelOutboundHandler asyncSshHandler;
         private final ChannelHandlerContext ctx;
 
         private IoInputStream asyncOut;
         private Buffer buf;
         private IoReadFuture currentReadFuture;
 
-        public SshReadAsyncListener(final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+        public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+            this.asyncSshHandler = asyncSshHandler;
             this.ctx = ctx;
             this.asyncOut = asyncOut;
             buf = new Buffer(BUFFER_SIZE);
@@ -232,14 +232,14 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         @Override
         public synchronized void operationComplete(final IoReadFuture future) {
             if(future.getException() != null) {
-
                 if(asyncOut.isClosed() || asyncOut.isClosing()) {
-                    // We are closing
-                    handleSshSessionClosed(ctx);
+                    // Ssh dropped
+                    logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
                 } else {
                     logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
-                    throw new IllegalStateException("Exception while reading from SSH remote on channel " + ctx.channel(), future.getException());
                 }
+                invokeDisconnect();
+                return;
             }
 
             if (future.getRead() > 0) {
@@ -252,6 +252,15 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             }
         }
 
+        private void invokeDisconnect() {
+            try {
+                asyncSshHandler.disconnect(ctx, ctx.newPromise());
+            } catch (final Exception e) {
+                // This should not happen
+                throw new IllegalStateException(e);
+            }
+        }
+
         @Override
         public synchronized void close() {
             // Remove self as listener on close to prevent reading from closed input
@@ -281,10 +290,13 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             this.asyncIn = asyncIn;
         }
 
+        int c = 0;
+
         public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
             try {
-                if(asyncIn.isClosed() || asyncIn.isClosing()) {
-                    handleSshSessionClosed(ctx);
+                if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
+                    // If we are closed/closing, set immediate fail
+                    promise.setFailure(new IllegalStateException("Channel closed"));
                 } else {
                     lastWriteFuture = asyncIn.write(toBuffer(msg));
                     lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
@@ -296,8 +308,9 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
                             // Notify success or failure
                             if (future.isWritten()) {
                                 promise.setSuccess();
+                            } else {
+                                promise.setFailure(future.getException());
                             }
-                            promise.setFailure(future.getException());
 
                             // Reset last pending future
                             synchronized (SshWriteAsyncHandler.this) {
@@ -310,22 +323,29 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
                 // Check limit for pending writes
                 pendingWriteCounter++;
                 if(pendingWriteCounter > MAX_PENDING_WRITES) {
+                    promise.setFailure(e);
                     handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
                             ", remote window is not getting read or is too small"));
                 }
 
+                // We need to reset buffer read index, since we've already read it when we tried to write it the first time
+                ((ByteBuf) msg).resetReaderIndex();
                 logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
 
                 // In case of pending, re-invoke write after pending is finished
+                Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e);
                 lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
                     @Override
                     public void operationComplete(final IoWriteFuture future) {
-                        if(future.isWritten()) {
+                        // FIXME possible minor race condition, we cannot guarantee that this callback when pending is finished will be executed first
+                        // External thread could trigger write on this instance while we are on this line
+                        // Verify
+                        if (future.isWritten()) {
                             synchronized (SshWriteAsyncHandler.this) {
                                 // Pending done, decrease counter
                                 pendingWriteCounter--;
+                                write(ctx, msg, promise);
                             }
-                            write(ctx, msg, promise);
                         } else {
                             // Cannot reschedule pending, fail
                             handlePendingFailed(ctx, e);