Merge "Fix channelInactive event handling in the netty pipeline for netconf."
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandler.java
index 935cb8dcd06ca966e6c560560d2030150cced460..05cd598cdc22f7b1265c661c447b850cab1a0256 100644 (file)
@@ -10,9 +10,7 @@ package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
 
 import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandler;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
 import java.io.IOException;
@@ -25,12 +23,6 @@ import org.apache.sshd.client.future.ConnectFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
-import org.apache.sshd.common.io.IoInputStream;
-import org.apache.sshd.common.io.IoOutputStream;
-import org.apache.sshd.common.io.IoReadFuture;
-import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.WritePendingException;
-import org.apache.sshd.common.util.Buffer;
 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +32,7 @@ import org.slf4j.LoggerFactory;
  */
 public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
-    private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class);
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandler.class);
     public static final String SUBSYSTEM = "netconf";
 
     public static final SshClient DEFAULT_CLIENT = SshClient.setUpDefaultClient();
@@ -56,8 +48,8 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     private final AuthenticationHandler authenticationHandler;
     private final SshClient sshClient;
 
-    private SshReadAsyncListener sshReadAsyncListener;
-    private SshWriteAsyncHandler sshWriteAsyncHandler;
+    private AsyncSshHandlerReader sshReadAsyncListener;
+    private AsyncSshHandlerWriter sshWriteAsyncHandler;
 
     private ClientChannel channel;
     private ClientSession session;
@@ -82,7 +74,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     }
 
     private void startSsh(final ChannelHandlerContext ctx, final SocketAddress address) {
-        logger.debug("Starting SSH to {} on channel: {}", address, ctx.channel());
+        LOG.debug("Starting SSH to {} on channel: {}", address, ctx.channel());
 
         final ConnectFuture sshConnectionFuture = sshClient.connect(authenticationHandler.getUsername(), address);
         sshConnectionFuture.addListener(new SshFutureListener<ConnectFuture>() {
@@ -99,7 +91,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
     private synchronized void handleSshSessionCreated(final ConnectFuture future, final ChannelHandlerContext ctx) {
         try {
-            logger.trace("SSH session created on channel: {}", ctx.channel());
+            LOG.trace("SSH session created on channel: {}", ctx.channel());
 
             session = future.getSession();
             final AuthFuture authenticateFuture = authenticationHandler.authenticate(session);
@@ -120,7 +112,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
     private synchronized void handleSshAuthenticated(final ClientSession session, final ChannelHandlerContext ctx) {
         try {
-            logger.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(), session.getServerVersion());
+            LOG.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(), session.getServerVersion());
 
             channel = session.createSubsystemChannel(SUBSYSTEM);
             channel.setStreaming(ClientChannel.Streaming.Async);
@@ -142,22 +134,39 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     }
 
     private synchronized void handleSshChanelOpened(final ChannelHandlerContext ctx) {
-        logger.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel());
+        LOG.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel());
 
         connectPromise.setSuccess();
-        connectPromise = null;
 
-        sshReadAsyncListener = new SshReadAsyncListener(ctx, channel.getAsyncOut());
-        sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
+        // TODO we should also read from error stream and at least log from that
 
-        ctx.fireChannelActive();
+        sshReadAsyncListener = new AsyncSshHandlerReader(new AutoCloseable() {
+            @Override
+            public void close() throws Exception {
+                AsyncSshHandler.this.disconnect(ctx, ctx.newPromise());
+            }
+        }, new AsyncSshHandlerReader.ReadMsgHandler() {
+            @Override
+            public void onMessageRead(final ByteBuf msg) {
+                ctx.fireChannelRead(msg);
+            }
+        }, channel.toString(), channel.getAsyncOut());
+
+        // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
+        if(channel != null) {
+            sshWriteAsyncHandler = new AsyncSshHandlerWriter(channel.getAsyncIn());
+            ctx.fireChannelActive();
+        }
     }
 
     private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
-        logger.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), e);
-        connectPromise.setFailure(e);
-        connectPromise = null;
-        throw new IllegalStateException("Unable to setup SSH connection on channel: " + ctx.channel(), e);
+        LOG.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), e);
+        disconnect(ctx, ctx.newPromise());
+
+        // If the promise is not yet done, we have failed with initial connect and set connectPromise to failure
+        if(!connectPromise.isDone()) {
+            connectPromise.setFailure(e);
+        }
     }
 
     @Override
@@ -165,13 +174,9 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         sshWriteAsyncHandler.write(ctx, msg, promise);
     }
 
-    private static void handleSshSessionClosed(final ChannelHandlerContext ctx) {
-        logger.debug("SSH session closed on channel: {}", ctx.channel());
-        ctx.fireChannelInactive();
-    }
-
     @Override
     public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
+        LOG.debug("XXX session connecting on channel {}. promise: {} ", ctx.channel(), connectPromise);
         this.connectPromise = promise;
         startSsh(ctx, remoteAddress);
     }
@@ -183,14 +188,21 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
     @Override
     public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
-        if(sshReadAsyncListener != null) {
-            sshReadAsyncListener.close();
+        LOG.trace("Closing SSH session on channel: {} with connect promise in state: {}", ctx.channel(), connectPromise);
+
+        // If we have already succeeded and the session was dropped after, we need to fire inactive to notify reconnect logic
+        if(connectPromise.isSuccess()) {
+            ctx.fireChannelInactive();
         }
 
         if(sshWriteAsyncHandler != null) {
             sshWriteAsyncHandler.close();
         }
 
+        if(sshReadAsyncListener != null) {
+            sshReadAsyncListener.close();
+        }
+
         if(session!= null && !session.isClosed() && !session.isClosing()) {
             session.close(false).addListener(new SshFutureListener<CloseFuture>() {
                 @Override
@@ -203,162 +215,19 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             });
         }
 
-        channel = null;
-        promise.setSuccess();
-
-        handleSshSessionClosed(ctx);
-    }
-
-    /**
-     * Listener over async input stream from SSH session.
-     * This listeners schedules reads in a loop until the session is closed or read fails.
-     */
-    private static class SshReadAsyncListener implements SshFutureListener<IoReadFuture>, AutoCloseable {
-        private static final int BUFFER_SIZE = 8192;
-
-        private final ChannelHandlerContext ctx;
-
-        private IoInputStream asyncOut;
-        private Buffer buf;
-        private IoReadFuture currentReadFuture;
-
-        public SshReadAsyncListener(final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
-            this.ctx = ctx;
-            this.asyncOut = asyncOut;
-            buf = new Buffer(BUFFER_SIZE);
-            asyncOut.read(buf).addListener(this);
-        }
-
-        @Override
-        public synchronized void operationComplete(final IoReadFuture future) {
-            if(future.getException() != null) {
-
-                if(asyncOut.isClosed() || asyncOut.isClosing()) {
-                    // We are closing
-                    handleSshSessionClosed(ctx);
-                } else {
-                    logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
-                    throw new IllegalStateException("Exception while reading from SSH remote on channel " + ctx.channel(), future.getException());
-                }
-            }
-
-            if (future.getRead() > 0) {
-                ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead()));
-
-                // Schedule next read
-                buf = new Buffer(BUFFER_SIZE);
-                currentReadFuture = asyncOut.read(buf);
-                currentReadFuture.addListener(this);
-            }
+        // Super disconnect is necessary in this case since we are using NioSocketChannel and it needs to cleanup its resources
+        // e.g. Socket that it tries to open in its constructor (https://bugs.opendaylight.org/show_bug.cgi?id=2430)
+        // TODO better solution would be to implement custom ChannelFactory + Channel that will use mina SSH lib internally: port this to custom channel implementation
+        try {
+            // Disconnect has to be closed after inactive channel event was fired, because it interferes with it
+            super.disconnect(ctx, ctx.newPromise());
+        } catch (final Exception e) {
+            LOG.warn("Unable to cleanup all resources for channel: {}. Ignoring.", ctx.channel(), e);
         }
 
-        @Override
-        public synchronized void close() {
-            // Remove self as listener on close to prevent reading from closed input
-            if(currentReadFuture != null) {
-                currentReadFuture.removeListener(this);
-            }
-
-            asyncOut = null;
-        }
+        channel = null;
+        promise.setSuccess();
+        LOG.debug("SSH session closed on channel: {}", ctx.channel());
     }
 
-    private static final class SshWriteAsyncHandler implements AutoCloseable {
-        public static final int MAX_PENDING_WRITES = 100;
-
-        private final ChannelOutboundHandler channelHandler;
-        private IoOutputStream asyncIn;
-
-        // Counter that holds the amount of pending write messages
-        // Pending write can occur in case remote window is full
-        // In such case, we need to wait for the pending write to finish
-        private int pendingWriteCounter;
-        // Last write future, that can be pending
-        private IoWriteFuture lastWriteFuture;
-
-        public SshWriteAsyncHandler(final ChannelOutboundHandler channelHandler, final IoOutputStream asyncIn) {
-            this.channelHandler = channelHandler;
-            this.asyncIn = asyncIn;
-        }
-
-        public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
-            try {
-                if(asyncIn.isClosed() || asyncIn.isClosing()) {
-                    handleSshSessionClosed(ctx);
-                } else {
-                    lastWriteFuture = asyncIn.write(toBuffer(msg));
-                    lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
-
-                        @Override
-                        public void operationComplete(final IoWriteFuture future) {
-                            ((ByteBuf) msg).release();
-
-                            // Notify success or failure
-                            if (future.isWritten()) {
-                                promise.setSuccess();
-                            }
-                            promise.setFailure(future.getException());
-
-                            // Reset last pending future
-                            synchronized (SshWriteAsyncHandler.this) {
-                                lastWriteFuture = null;
-                            }
-                        }
-                    });
-                }
-            } catch (final WritePendingException e) {
-                // Check limit for pending writes
-                pendingWriteCounter++;
-                if(pendingWriteCounter > MAX_PENDING_WRITES) {
-                    handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
-                            ", remote window is not getting read or is too small"));
-                }
-
-                logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
-
-                // In case of pending, re-invoke write after pending is finished
-                lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
-                    @Override
-                    public void operationComplete(final IoWriteFuture future) {
-                        if(future.isWritten()) {
-                            synchronized (SshWriteAsyncHandler.this) {
-                                // Pending done, decrease counter
-                                pendingWriteCounter--;
-                            }
-                            write(ctx, msg, promise);
-                        } else {
-                            // Cannot reschedule pending, fail
-                            handlePendingFailed(ctx, e);
-                        }
-                    }
-
-                });
-            }
-        }
-
-        private void handlePendingFailed(final ChannelHandlerContext ctx, final Exception e) {
-            logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e);
-            try {
-                channelHandler.disconnect(ctx, ctx.newPromise());
-            } catch (final Exception ex) {
-                // This should not happen
-                throw new IllegalStateException(ex);
-            }
-        }
-
-        @Override
-        public void close() {
-            asyncIn = null;
-        }
-
-        private Buffer toBuffer(final Object msg) {
-            // TODO Buffer vs ByteBuf translate, Can we handle that better ?
-            Preconditions.checkState(msg instanceof ByteBuf);
-            final ByteBuf byteBuf = (ByteBuf) msg;
-            final byte[] temp = new byte[byteBuf.readableBytes()];
-            byteBuf.readBytes(temp, 0, byteBuf.readableBytes());
-            return new Buffer(temp);
-        }
-
-    }
 }