Merge "Fix channelInactive event handling in the netty pipeline for netconf."
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandler.java
index fa7d0900edc805ebd45f3b2edce8a21d46cfb031..05cd598cdc22f7b1265c661c447b850cab1a0256 100644 (file)
@@ -8,9 +8,13 @@
 
 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
 
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
 import java.io.IOException;
 import java.net.SocketAddress;
-
 import org.apache.sshd.ClientChannel;
 import org.apache.sshd.ClientSession;
 import org.apache.sshd.SshClient;
@@ -23,19 +27,12 @@ import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
-
 /**
  * Netty SSH handler class. Acts as interface between Netty and SSH library.
  */
 public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
-    private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class);
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandler.class);
     public static final String SUBSYSTEM = "netconf";
 
     public static final SshClient DEFAULT_CLIENT = SshClient.setUpDefaultClient();
@@ -77,7 +74,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     }
 
     private void startSsh(final ChannelHandlerContext ctx, final SocketAddress address) {
-        logger.debug("Starting SSH to {} on channel: {}", address, ctx.channel());
+        LOG.debug("Starting SSH to {} on channel: {}", address, ctx.channel());
 
         final ConnectFuture sshConnectionFuture = sshClient.connect(authenticationHandler.getUsername(), address);
         sshConnectionFuture.addListener(new SshFutureListener<ConnectFuture>() {
@@ -94,7 +91,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
     private synchronized void handleSshSessionCreated(final ConnectFuture future, final ChannelHandlerContext ctx) {
         try {
-            logger.trace("SSH session created on channel: {}", ctx.channel());
+            LOG.trace("SSH session created on channel: {}", ctx.channel());
 
             session = future.getSession();
             final AuthFuture authenticateFuture = authenticationHandler.authenticate(session);
@@ -115,7 +112,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
     private synchronized void handleSshAuthenticated(final ClientSession session, final ChannelHandlerContext ctx) {
         try {
-            logger.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(), session.getServerVersion());
+            LOG.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(), session.getServerVersion());
 
             channel = session.createSubsystemChannel(SUBSYSTEM);
             channel.setStreaming(ClientChannel.Streaming.Async);
@@ -137,10 +134,9 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     }
 
     private synchronized void handleSshChanelOpened(final ChannelHandlerContext ctx) {
-        logger.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel());
+        LOG.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel());
 
         connectPromise.setSuccess();
-        connectPromise = null;
 
         // TODO we should also read from error stream and at least log from that
 
@@ -164,10 +160,13 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     }
 
     private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
-        logger.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), e);
-        connectPromise.setFailure(e);
-        connectPromise = null;
-        throw new IllegalStateException("Unable to setup SSH connection on channel: " + ctx.channel(), e);
+        LOG.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), e);
+        disconnect(ctx, ctx.newPromise());
+
+        // If the promise is not yet done, we have failed with initial connect and set connectPromise to failure
+        if(!connectPromise.isDone()) {
+            connectPromise.setFailure(e);
+        }
     }
 
     @Override
@@ -177,6 +176,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
     @Override
     public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
+        LOG.debug("XXX session connecting on channel {}. promise: {} ", ctx.channel(), connectPromise);
         this.connectPromise = promise;
         startSsh(ctx, remoteAddress);
     }
@@ -188,14 +188,21 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
     @Override
     public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
-        if(sshReadAsyncListener != null) {
-            sshReadAsyncListener.close();
+        LOG.trace("Closing SSH session on channel: {} with connect promise in state: {}", ctx.channel(), connectPromise);
+
+        // If we have already succeeded and the session was dropped after, we need to fire inactive to notify reconnect logic
+        if(connectPromise.isSuccess()) {
+            ctx.fireChannelInactive();
         }
 
         if(sshWriteAsyncHandler != null) {
             sshWriteAsyncHandler.close();
         }
 
+        if(sshReadAsyncListener != null) {
+            sshReadAsyncListener.close();
+        }
+
         if(session!= null && !session.isClosed() && !session.isClosing()) {
             session.close(false).addListener(new SshFutureListener<CloseFuture>() {
                 @Override
@@ -208,11 +215,19 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             });
         }
 
+        // Super disconnect is necessary in this case since we are using NioSocketChannel and it needs to cleanup its resources
+        // e.g. Socket that it tries to open in its constructor (https://bugs.opendaylight.org/show_bug.cgi?id=2430)
+        // TODO better solution would be to implement custom ChannelFactory + Channel that will use mina SSH lib internally: port this to custom channel implementation
+        try {
+            // Disconnect has to be closed after inactive channel event was fired, because it interferes with it
+            super.disconnect(ctx, ctx.newPromise());
+        } catch (final Exception e) {
+            LOG.warn("Unable to cleanup all resources for channel: {}. Ignoring.", ctx.channel(), e);
+        }
+
         channel = null;
         promise.setSuccess();
-
-        logger.debug("SSH session closed on channel: {}", ctx.channel());
-        ctx.fireChannelInactive();
+        LOG.debug("SSH session closed on channel: {}", ctx.channel());
     }
 
 }