Merge "BUG-1621 Fix reconnecting."
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandler.java
index 935cb8dcd06ca966e6c560560d2030150cced460..0d877c9ec73797010013df229b9101d86445304f 100644 (file)
@@ -147,7 +147,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         connectPromise.setSuccess();
         connectPromise = null;
 
-        sshReadAsyncListener = new SshReadAsyncListener(ctx, channel.getAsyncOut());
+        sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
         sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
 
         ctx.fireChannelActive();
@@ -165,11 +165,6 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         sshWriteAsyncHandler.write(ctx, msg, promise);
     }
 
-    private static void handleSshSessionClosed(final ChannelHandlerContext ctx) {
-        logger.debug("SSH session closed on channel: {}", ctx.channel());
-        ctx.fireChannelInactive();
-    }
-
     @Override
     public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
         this.connectPromise = promise;
@@ -206,7 +201,8 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         channel = null;
         promise.setSuccess();
 
-        handleSshSessionClosed(ctx);
+        logger.debug("SSH session closed on channel: {}", ctx.channel());
+        ctx.fireChannelInactive();
     }
 
     /**
@@ -216,13 +212,15 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     private static class SshReadAsyncListener implements SshFutureListener<IoReadFuture>, AutoCloseable {
         private static final int BUFFER_SIZE = 8192;
 
+        private final ChannelOutboundHandler asyncSshHandler;
         private final ChannelHandlerContext ctx;
 
         private IoInputStream asyncOut;
         private Buffer buf;
         private IoReadFuture currentReadFuture;
 
-        public SshReadAsyncListener(final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+        public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+            this.asyncSshHandler = asyncSshHandler;
             this.ctx = ctx;
             this.asyncOut = asyncOut;
             buf = new Buffer(BUFFER_SIZE);
@@ -234,11 +232,14 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             if(future.getException() != null) {
 
                 if(asyncOut.isClosed() || asyncOut.isClosing()) {
-                    // We are closing
-                    handleSshSessionClosed(ctx);
+
+                    // Ssh dropped
+                    logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
+                    invokeDisconnect();
+                    return;
                 } else {
                     logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
-                    throw new IllegalStateException("Exception while reading from SSH remote on channel " + ctx.channel(), future.getException());
+                    invokeDisconnect();
                 }
             }
 
@@ -252,6 +253,15 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             }
         }
 
+        private void invokeDisconnect() {
+            try {
+                asyncSshHandler.disconnect(ctx, ctx.newPromise());
+            } catch (final Exception e) {
+                // This should not happen
+                throw new IllegalStateException(e);
+            }
+        }
+
         @Override
         public synchronized void close() {
             // Remove self as listener on close to prevent reading from closed input
@@ -281,10 +291,13 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             this.asyncIn = asyncIn;
         }
 
+        int c = 0;
+
         public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
             try {
-                if(asyncIn.isClosed() || asyncIn.isClosing()) {
-                    handleSshSessionClosed(ctx);
+                if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
+                    // If we are closed/closing, set immediate fail
+                    promise.setFailure(new IllegalStateException("Channel closed"));
                 } else {
                     lastWriteFuture = asyncIn.write(toBuffer(msg));
                     lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
@@ -296,8 +309,9 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
                             // Notify success or failure
                             if (future.isWritten()) {
                                 promise.setSuccess();
+                            } else {
+                                promise.setFailure(future.getException());
                             }
-                            promise.setFailure(future.getException());
 
                             // Reset last pending future
                             synchronized (SshWriteAsyncHandler.this) {
@@ -320,7 +334,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
                 lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
                     @Override
                     public void operationComplete(final IoWriteFuture future) {
-                        if(future.isWritten()) {
+                        if (future.isWritten()) {
                             synchronized (SshWriteAsyncHandler.this) {
                                 // Pending done, decrease counter
                                 pendingWriteCounter--;