X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FAsyncSshHandler.java;h=3d1e4784f2ded5677472e3a25d99adaa11e24cf9;hp=935cb8dcd06ca966e6c560560d2030150cced460;hb=2473df9920f0820fde7dcaf62eaf14166695a5f6;hpb=597deaeac655af6bff79cb892165a1d5a32ea826 diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java index 935cb8dcd0..3d1e4784f2 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java @@ -147,10 +147,12 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { connectPromise.setSuccess(); connectPromise = null; - sshReadAsyncListener = new SshReadAsyncListener(ctx, channel.getAsyncOut()); - sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn()); - - ctx.fireChannelActive(); + sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut()); + // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null + if(channel != null) { + sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn()); + ctx.fireChannelActive(); + } } private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) { @@ -165,11 +167,6 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { sshWriteAsyncHandler.write(ctx, msg, promise); } - private static void handleSshSessionClosed(final ChannelHandlerContext ctx) { - logger.debug("SSH session closed on channel: {}", ctx.channel()); - ctx.fireChannelInactive(); - } - @Override public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception { this.connectPromise = promise; @@ -206,7 +203,8 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { channel = null; promise.setSuccess(); - handleSshSessionClosed(ctx); + logger.debug("SSH session closed on channel: {}", ctx.channel()); + ctx.fireChannelInactive(); } /** @@ -216,13 +214,15 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private static class SshReadAsyncListener implements SshFutureListener, AutoCloseable { private static final int BUFFER_SIZE = 8192; + private final ChannelOutboundHandler asyncSshHandler; private final ChannelHandlerContext ctx; private IoInputStream asyncOut; private Buffer buf; private IoReadFuture currentReadFuture; - public SshReadAsyncListener(final ChannelHandlerContext ctx, final IoInputStream asyncOut) { + public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) { + this.asyncSshHandler = asyncSshHandler; this.ctx = ctx; this.asyncOut = asyncOut; buf = new Buffer(BUFFER_SIZE); @@ -232,14 +232,14 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { @Override public synchronized void operationComplete(final IoReadFuture future) { if(future.getException() != null) { - if(asyncOut.isClosed() || asyncOut.isClosing()) { - // We are closing - handleSshSessionClosed(ctx); + // Ssh dropped + logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException()); } else { logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException()); - throw new IllegalStateException("Exception while reading from SSH remote on channel " + ctx.channel(), future.getException()); } + invokeDisconnect(); + return; } if (future.getRead() > 0) { @@ -252,6 +252,15 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { } } + private void invokeDisconnect() { + try { + asyncSshHandler.disconnect(ctx, ctx.newPromise()); + } catch (final Exception e) { + // This should not happen + throw new IllegalStateException(e); + } + } + @Override public synchronized void close() { // Remove self as listener on close to prevent reading from closed input @@ -281,10 +290,13 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { this.asyncIn = asyncIn; } + int c = 0; + public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { try { - if(asyncIn.isClosed() || asyncIn.isClosing()) { - handleSshSessionClosed(ctx); + if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) { + // If we are closed/closing, set immediate fail + promise.setFailure(new IllegalStateException("Channel closed")); } else { lastWriteFuture = asyncIn.write(toBuffer(msg)); lastWriteFuture.addListener(new SshFutureListener() { @@ -296,8 +308,9 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { // Notify success or failure if (future.isWritten()) { promise.setSuccess(); + } else { + promise.setFailure(future.getException()); } - promise.setFailure(future.getException()); // Reset last pending future synchronized (SshWriteAsyncHandler.this) { @@ -310,22 +323,29 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { // Check limit for pending writes pendingWriteCounter++; if(pendingWriteCounter > MAX_PENDING_WRITES) { + promise.setFailure(e); handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() + ", remote window is not getting read or is too small")); } + // We need to reset buffer read index, since we've already read it when we tried to write it the first time + ((ByteBuf) msg).resetReaderIndex(); logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter); // In case of pending, re-invoke write after pending is finished + Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e); lastWriteFuture.addListener(new SshFutureListener() { @Override public void operationComplete(final IoWriteFuture future) { - if(future.isWritten()) { + // FIXME possible minor race condition, we cannot guarantee that this callback when pending is finished will be executed first + // External thread could trigger write on this instance while we are on this line + // Verify + if (future.isWritten()) { synchronized (SshWriteAsyncHandler.this) { // Pending done, decrease counter pendingWriteCounter--; + write(ctx, msg, promise); } - write(ctx, msg, promise); } else { // Cannot reschedule pending, fail handlePendingFailed(ctx, e);