BUG-1842 Fix byte buffer handling for pending messages
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandler.java
index 0d877c9ec73797010013df229b9101d86445304f..3d1e4784f2ded5677472e3a25d99adaa11e24cf9 100644 (file)
@@ -148,9 +148,11 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         connectPromise = null;
 
         sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
-        sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
-
-        ctx.fireChannelActive();
+        // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
+        if(channel != null) {
+            sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
+            ctx.fireChannelActive();
+        }
     }
 
     private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
@@ -230,17 +232,14 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         @Override
         public synchronized void operationComplete(final IoReadFuture future) {
             if(future.getException() != null) {
-
                 if(asyncOut.isClosed() || asyncOut.isClosing()) {
-
                     // Ssh dropped
                     logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
-                    invokeDisconnect();
-                    return;
                 } else {
                     logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
-                    invokeDisconnect();
                 }
+                invokeDisconnect();
+                return;
             }
 
             if (future.getRead() > 0) {
@@ -324,22 +323,29 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
                 // Check limit for pending writes
                 pendingWriteCounter++;
                 if(pendingWriteCounter > MAX_PENDING_WRITES) {
+                    promise.setFailure(e);
                     handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
                             ", remote window is not getting read or is too small"));
                 }
 
+                // We need to reset buffer read index, since we've already read it when we tried to write it the first time
+                ((ByteBuf) msg).resetReaderIndex();
                 logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
 
                 // In case of pending, re-invoke write after pending is finished
+                Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e);
                 lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
                     @Override
                     public void operationComplete(final IoWriteFuture future) {
+                        // FIXME possible minor race condition, we cannot guarantee that this callback when pending is finished will be executed first
+                        // External thread could trigger write on this instance while we are on this line
+                        // Verify
                         if (future.isWritten()) {
                             synchronized (SshWriteAsyncHandler.this) {
                                 // Pending done, decrease counter
                                 pendingWriteCounter--;
+                                write(ctx, msg, promise);
                             }
-                            write(ctx, msg, promise);
                         } else {
                             // Cannot reschedule pending, fail
                             handlePendingFailed(ctx, e);