BUG-1621 Fix reconnecting. 73/10373/1
authorMaros Marsalek <mmarsale@cisco.com>
Wed, 27 Aug 2014 12:51:12 +0000 (14:51 +0200)
committerMaros Marsalek <mmarsale@cisco.com>
Wed, 27 Aug 2014 13:13:33 +0000 (15:13 +0200)
Netconf-connector did not reconnect after recent changes (Ssh mina integration)
SshHandler was in pipeline after listener in reconnect, thats why the listener received no event about session down.

Change-Id: Id39062f51bc3a0caf066ca49682a2acc837b06ef
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalFacade.java
opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/SshClientChannelInitializer.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java
opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java

index fe1012f443fc7824b1e2b8b3d3c4ed15ecbfb30d..ea87afa48dedddfe20ef5430aa60d25ecc594c1a 100644 (file)
@@ -47,10 +47,13 @@ final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionList
         pending = this.dispatcher.createClient(this.address, cs, b, new AbstractDispatcher.PipelineInitializer<S>() {
             @Override
             public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
-                initializer.initializeChannel(channel, promise);
-
                 // add closed channel handler
+                // This handler has to be added before initializer.initializeChannel is called
+                // Initializer might add some handlers using addFirst e.g. AsyncSshHandler and in that case
+                // closed channel handler is before the handler that invokes channel inactive event
                 channel.pipeline().addFirst(new ClosedChannelHandler(ReconnectPromise.this));
+
+                initializer.initializeChannel(channel, promise);
             }
         });
     }
@@ -88,6 +91,9 @@ final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionList
 
         @Override
         public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+            // Pass info about disconnect further and then reconnect
+            super.channelInactive(ctx);
+
             if (promise.isCancelled()) {
                 return;
             }
index 3cc513600dd709484f464fbae0530c7df67b4f29..2000e11a35db1f26b330cc5f6df732aa1df058a4 100644 (file)
@@ -95,7 +95,7 @@ public final class NetconfDeviceSalFacade implements AutoCloseable, RemoteDevice
     }
 
     @Override
-    public void onDeviceDisconnected() {
+    public synchronized void onDeviceDisconnected() {
         salProvider.getDatastoreAdapter().updateDeviceState(false, Collections.<QName>emptySet());
         salProvider.getMountInstance().onDeviceDisconnected();
     }
index 2aa5d15224be5efa4c4f6f56a2498eab9df5b4f7..7a14c4f4ac3507e953e4cdf595cc6575b38cd86e 100644 (file)
@@ -32,6 +32,7 @@ final class SshClientChannelInitializer extends AbstractChannelInitializer<Netco
     @Override
     public void initialize(final Channel ch, final Promise<NetconfClientSession> promise) {
         try {
+            // ssh handler has to be the first handler in pipeline
             ch.pipeline().addFirst(AsyncSshHandler.createForNetconfSubsystem(authenticationHandler));
             super.initialize(ch,promise);
         } catch (final IOException e) {
index 935cb8dcd06ca966e6c560560d2030150cced460..0d877c9ec73797010013df229b9101d86445304f 100644 (file)
@@ -147,7 +147,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         connectPromise.setSuccess();
         connectPromise = null;
 
-        sshReadAsyncListener = new SshReadAsyncListener(ctx, channel.getAsyncOut());
+        sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
         sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
 
         ctx.fireChannelActive();
@@ -165,11 +165,6 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         sshWriteAsyncHandler.write(ctx, msg, promise);
     }
 
-    private static void handleSshSessionClosed(final ChannelHandlerContext ctx) {
-        logger.debug("SSH session closed on channel: {}", ctx.channel());
-        ctx.fireChannelInactive();
-    }
-
     @Override
     public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
         this.connectPromise = promise;
@@ -206,7 +201,8 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         channel = null;
         promise.setSuccess();
 
-        handleSshSessionClosed(ctx);
+        logger.debug("SSH session closed on channel: {}", ctx.channel());
+        ctx.fireChannelInactive();
     }
 
     /**
@@ -216,13 +212,15 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     private static class SshReadAsyncListener implements SshFutureListener<IoReadFuture>, AutoCloseable {
         private static final int BUFFER_SIZE = 8192;
 
+        private final ChannelOutboundHandler asyncSshHandler;
         private final ChannelHandlerContext ctx;
 
         private IoInputStream asyncOut;
         private Buffer buf;
         private IoReadFuture currentReadFuture;
 
-        public SshReadAsyncListener(final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+        public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+            this.asyncSshHandler = asyncSshHandler;
             this.ctx = ctx;
             this.asyncOut = asyncOut;
             buf = new Buffer(BUFFER_SIZE);
@@ -234,11 +232,14 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             if(future.getException() != null) {
 
                 if(asyncOut.isClosed() || asyncOut.isClosing()) {
-                    // We are closing
-                    handleSshSessionClosed(ctx);
+
+                    // Ssh dropped
+                    logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
+                    invokeDisconnect();
+                    return;
                 } else {
                     logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
-                    throw new IllegalStateException("Exception while reading from SSH remote on channel " + ctx.channel(), future.getException());
+                    invokeDisconnect();
                 }
             }
 
@@ -252,6 +253,15 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             }
         }
 
+        private void invokeDisconnect() {
+            try {
+                asyncSshHandler.disconnect(ctx, ctx.newPromise());
+            } catch (final Exception e) {
+                // This should not happen
+                throw new IllegalStateException(e);
+            }
+        }
+
         @Override
         public synchronized void close() {
             // Remove self as listener on close to prevent reading from closed input
@@ -281,10 +291,13 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             this.asyncIn = asyncIn;
         }
 
+        int c = 0;
+
         public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
             try {
-                if(asyncIn.isClosed() || asyncIn.isClosing()) {
-                    handleSshSessionClosed(ctx);
+                if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
+                    // If we are closed/closing, set immediate fail
+                    promise.setFailure(new IllegalStateException("Channel closed"));
                 } else {
                     lastWriteFuture = asyncIn.write(toBuffer(msg));
                     lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
@@ -296,8 +309,9 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
                             // Notify success or failure
                             if (future.isWritten()) {
                                 promise.setSuccess();
+                            } else {
+                                promise.setFailure(future.getException());
                             }
-                            promise.setFailure(future.getException());
 
                             // Reset last pending future
                             synchronized (SshWriteAsyncHandler.this) {
@@ -320,7 +334,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
                 lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
                     @Override
                     public void operationComplete(final IoWriteFuture future) {
-                        if(future.isWritten()) {
+                        if (future.isWritten()) {
                             synchronized (SshWriteAsyncHandler.this) {
                                 // Pending done, decrease counter
                                 pendingWriteCounter--;
index b32e880537e06d44571875a3dda56c4a36dddf6f..b3478c36931a7286f06a7eea4d9bc56bd6ccc7eb 100644 (file)
@@ -62,6 +62,8 @@ public class SSHTest {
         AuthProvider authProvider = mock(AuthProviderImpl.class);
         doReturn(PEMGenerator.generate().toCharArray()).when(authProvider).getPEMAsCharArray();
         doReturn(true).when(authProvider).authenticated(anyString(), anyString());
+        doReturn("auth").when(authProvider).toString();
+
         NetconfSSHServer netconfSSHServer = NetconfSSHServer.start(10831, NetconfConfigUtil.getNetconfLocalAddress(),
                 authProvider, new NioEventLoopGroup());