From cb9e2e21d4f21ddb3b912cd1baa8bbb790acab22 Mon Sep 17 00:00:00 2001 From: Maros Marsalek Date: Wed, 27 Aug 2014 14:51:12 +0200 Subject: [PATCH] BUG-1621 Fix reconnecting. Netconf-connector did not reconnect after recent changes (Ssh mina integration) SshHandler was in pipeline after listener in reconnect, thats why the listener received no event about session down. Change-Id: Id39062f51bc3a0caf066ca49682a2acc837b06ef Signed-off-by: Maros Marsalek --- .../protocol/framework/ReconnectPromise.java | 10 ++++- .../netconf/sal/NetconfDeviceSalFacade.java | 2 +- .../client/SshClientChannelInitializer.java | 1 + .../handler/ssh/client/AsyncSshHandler.java | 44 ++++++++++++------- .../controller/netconf/netty/SSHTest.java | 2 + 5 files changed, 41 insertions(+), 18 deletions(-) diff --git a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java index fe1012f443..ea87afa48d 100644 --- a/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java +++ b/opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java @@ -47,10 +47,13 @@ final class ReconnectPromise, L extends SessionList pending = this.dispatcher.createClient(this.address, cs, b, new AbstractDispatcher.PipelineInitializer() { @Override public void initializeChannel(final SocketChannel channel, final Promise promise) { - initializer.initializeChannel(channel, promise); - // add closed channel handler + // This handler has to be added before initializer.initializeChannel is called + // Initializer might add some handlers using addFirst e.g. AsyncSshHandler and in that case + // closed channel handler is before the handler that invokes channel inactive event channel.pipeline().addFirst(new ClosedChannelHandler(ReconnectPromise.this)); + + initializer.initializeChannel(channel, promise); } }); } @@ -88,6 +91,9 @@ final class ReconnectPromise, L extends SessionList @Override public void channelInactive(final ChannelHandlerContext ctx) throws Exception { + // Pass info about disconnect further and then reconnect + super.channelInactive(ctx); + if (promise.isCancelled()) { return; } diff --git a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalFacade.java b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalFacade.java index 3cc513600d..2000e11a35 100644 --- a/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalFacade.java +++ b/opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/sal/NetconfDeviceSalFacade.java @@ -95,7 +95,7 @@ public final class NetconfDeviceSalFacade implements AutoCloseable, RemoteDevice } @Override - public void onDeviceDisconnected() { + public synchronized void onDeviceDisconnected() { salProvider.getDatastoreAdapter().updateDeviceState(false, Collections.emptySet()); salProvider.getMountInstance().onDeviceDisconnected(); } diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/SshClientChannelInitializer.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/SshClientChannelInitializer.java index 2aa5d15224..7a14c4f4ac 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/SshClientChannelInitializer.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/SshClientChannelInitializer.java @@ -32,6 +32,7 @@ final class SshClientChannelInitializer extends AbstractChannelInitializer promise) { try { + // ssh handler has to be the first handler in pipeline ch.pipeline().addFirst(AsyncSshHandler.createForNetconfSubsystem(authenticationHandler)); super.initialize(ch,promise); } catch (final IOException e) { diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java index 935cb8dcd0..0d877c9ec7 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java @@ -147,7 +147,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { connectPromise.setSuccess(); connectPromise = null; - sshReadAsyncListener = new SshReadAsyncListener(ctx, channel.getAsyncOut()); + sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut()); sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn()); ctx.fireChannelActive(); @@ -165,11 +165,6 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { sshWriteAsyncHandler.write(ctx, msg, promise); } - private static void handleSshSessionClosed(final ChannelHandlerContext ctx) { - logger.debug("SSH session closed on channel: {}", ctx.channel()); - ctx.fireChannelInactive(); - } - @Override public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception { this.connectPromise = promise; @@ -206,7 +201,8 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { channel = null; promise.setSuccess(); - handleSshSessionClosed(ctx); + logger.debug("SSH session closed on channel: {}", ctx.channel()); + ctx.fireChannelInactive(); } /** @@ -216,13 +212,15 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private static class SshReadAsyncListener implements SshFutureListener, AutoCloseable { private static final int BUFFER_SIZE = 8192; + private final ChannelOutboundHandler asyncSshHandler; private final ChannelHandlerContext ctx; private IoInputStream asyncOut; private Buffer buf; private IoReadFuture currentReadFuture; - public SshReadAsyncListener(final ChannelHandlerContext ctx, final IoInputStream asyncOut) { + public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) { + this.asyncSshHandler = asyncSshHandler; this.ctx = ctx; this.asyncOut = asyncOut; buf = new Buffer(BUFFER_SIZE); @@ -234,11 +232,14 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { if(future.getException() != null) { if(asyncOut.isClosed() || asyncOut.isClosing()) { - // We are closing - handleSshSessionClosed(ctx); + + // Ssh dropped + logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException()); + invokeDisconnect(); + return; } else { logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException()); - throw new IllegalStateException("Exception while reading from SSH remote on channel " + ctx.channel(), future.getException()); + invokeDisconnect(); } } @@ -252,6 +253,15 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { } } + private void invokeDisconnect() { + try { + asyncSshHandler.disconnect(ctx, ctx.newPromise()); + } catch (final Exception e) { + // This should not happen + throw new IllegalStateException(e); + } + } + @Override public synchronized void close() { // Remove self as listener on close to prevent reading from closed input @@ -281,10 +291,13 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { this.asyncIn = asyncIn; } + int c = 0; + public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { try { - if(asyncIn.isClosed() || asyncIn.isClosing()) { - handleSshSessionClosed(ctx); + if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) { + // If we are closed/closing, set immediate fail + promise.setFailure(new IllegalStateException("Channel closed")); } else { lastWriteFuture = asyncIn.write(toBuffer(msg)); lastWriteFuture.addListener(new SshFutureListener() { @@ -296,8 +309,9 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { // Notify success or failure if (future.isWritten()) { promise.setSuccess(); + } else { + promise.setFailure(future.getException()); } - promise.setFailure(future.getException()); // Reset last pending future synchronized (SshWriteAsyncHandler.this) { @@ -320,7 +334,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { lastWriteFuture.addListener(new SshFutureListener() { @Override public void operationComplete(final IoWriteFuture future) { - if(future.isWritten()) { + if (future.isWritten()) { synchronized (SshWriteAsyncHandler.this) { // Pending done, decrease counter pendingWriteCounter--; diff --git a/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java b/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java index b32e880537..b3478c3693 100644 --- a/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java +++ b/opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java @@ -62,6 +62,8 @@ public class SSHTest { AuthProvider authProvider = mock(AuthProviderImpl.class); doReturn(PEMGenerator.generate().toCharArray()).when(authProvider).getPEMAsCharArray(); doReturn(true).when(authProvider).authenticated(anyString(), anyString()); + doReturn("auth").when(authProvider).toString(); + NetconfSSHServer netconfSSHServer = NetconfSSHServer.start(10831, NetconfConfigUtil.getNetconfLocalAddress(), authProvider, new NioEventLoopGroup()); -- 2.36.6