X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FAsyncSshHandler.java;h=3d1e4784f2ded5677472e3a25d99adaa11e24cf9;hp=2761a45d03bedc8730cb69036748c266cdc7f412;hb=2473df9920f0820fde7dcaf62eaf14166695a5f6;hpb=315a10ec8b79abec3f4d718359ebb4202bffcb11 diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java index 2761a45d03..3d1e4784f2 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java @@ -12,6 +12,7 @@ import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import java.io.IOException; @@ -25,7 +26,10 @@ import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.SshFutureListener; import org.apache.sshd.common.io.IoInputStream; +import org.apache.sshd.common.io.IoOutputStream; import org.apache.sshd.common.io.IoReadFuture; +import org.apache.sshd.common.io.IoWriteFuture; +import org.apache.sshd.common.io.WritePendingException; import org.apache.sshd.common.util.Buffer; import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler; import org.slf4j.Logger; @@ -53,10 +57,13 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private final SshClient sshClient; private SshReadAsyncListener sshReadAsyncListener; + private SshWriteAsyncHandler sshWriteAsyncHandler; + private ClientChannel channel; private ClientSession session; private ChannelPromise connectPromise; + public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler) throws IOException { return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT); } @@ -139,10 +146,13 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { connectPromise.setSuccess(); connectPromise = null; - ctx.fireChannelActive(); - final IoInputStream asyncOut = channel.getAsyncOut(); - sshReadAsyncListener = new SshReadAsyncListener(ctx, asyncOut); + sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut()); + // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null + if(channel != null) { + sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn()); + ctx.fireChannelActive(); + } } private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) { @@ -154,31 +164,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { @Override public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { - try { - if(channel.getAsyncIn().isClosed() || channel.getAsyncIn().isClosing()) { - handleSshSessionClosed(ctx); - } else { - channel.getAsyncIn().write(toBuffer(msg)); - ((ByteBuf) msg).release(); - } - } catch (final Exception e) { - logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e); - throw new IllegalStateException("Exception while writing to SSH remote on channel " + ctx.channel(),e); - } - } - - private static void handleSshSessionClosed(final ChannelHandlerContext ctx) { - logger.debug("SSH session closed on channel: {}", ctx.channel()); - ctx.fireChannelInactive(); - } - - private Buffer toBuffer(final Object msg) { - // TODO Buffer vs ByteBuf translate, Can we handle that better ? - Preconditions.checkState(msg instanceof ByteBuf); - final ByteBuf byteBuf = (ByteBuf) msg; - final byte[] temp = new byte[byteBuf.readableBytes()]; - byteBuf.readBytes(temp, 0, byteBuf.readableBytes()); - return new Buffer(temp); + sshWriteAsyncHandler.write(ctx, msg, promise); } @Override @@ -193,22 +179,32 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { } @Override - public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception { + public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) { if(sshReadAsyncListener != null) { sshReadAsyncListener.close(); } - session.close(false).addListener(new SshFutureListener() { - @Override - public void operationComplete(final CloseFuture future) { - if(future.isClosed() == false) { - session.close(true); + if(sshWriteAsyncHandler != null) { + sshWriteAsyncHandler.close(); + } + + if(session!= null && !session.isClosed() && !session.isClosing()) { + session.close(false).addListener(new SshFutureListener() { + @Override + public void operationComplete(final CloseFuture future) { + if (future.isClosed() == false) { + session.close(true); + } + session = null; } - session = null; - } - }); + }); + } channel = null; + promise.setSuccess(); + + logger.debug("SSH session closed on channel: {}", ctx.channel()); + ctx.fireChannelInactive(); } /** @@ -218,13 +214,15 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private static class SshReadAsyncListener implements SshFutureListener, AutoCloseable { private static final int BUFFER_SIZE = 8192; + private final ChannelOutboundHandler asyncSshHandler; private final ChannelHandlerContext ctx; private IoInputStream asyncOut; private Buffer buf; private IoReadFuture currentReadFuture; - public SshReadAsyncListener(final ChannelHandlerContext ctx, final IoInputStream asyncOut) { + public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) { + this.asyncSshHandler = asyncSshHandler; this.ctx = ctx; this.asyncOut = asyncOut; buf = new Buffer(BUFFER_SIZE); @@ -234,14 +232,14 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { @Override public synchronized void operationComplete(final IoReadFuture future) { if(future.getException() != null) { - if(asyncOut.isClosed() || asyncOut.isClosing()) { - // We are closing - handleSshSessionClosed(ctx); + // Ssh dropped + logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException()); } else { logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException()); - throw new IllegalStateException("Exception while reading from SSH remote on channel " + ctx.channel(), future.getException()); } + invokeDisconnect(); + return; } if (future.getRead() > 0) { @@ -254,8 +252,17 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { } } + private void invokeDisconnect() { + try { + asyncSshHandler.disconnect(ctx, ctx.newPromise()); + } catch (final Exception e) { + // This should not happen + throw new IllegalStateException(e); + } + } + @Override - public synchronized void close() throws Exception { + public synchronized void close() { // Remove self as listener on close to prevent reading from closed input if(currentReadFuture != null) { currentReadFuture.removeListener(this); @@ -264,4 +271,114 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { asyncOut = null; } } + + private static final class SshWriteAsyncHandler implements AutoCloseable { + public static final int MAX_PENDING_WRITES = 100; + + private final ChannelOutboundHandler channelHandler; + private IoOutputStream asyncIn; + + // Counter that holds the amount of pending write messages + // Pending write can occur in case remote window is full + // In such case, we need to wait for the pending write to finish + private int pendingWriteCounter; + // Last write future, that can be pending + private IoWriteFuture lastWriteFuture; + + public SshWriteAsyncHandler(final ChannelOutboundHandler channelHandler, final IoOutputStream asyncIn) { + this.channelHandler = channelHandler; + this.asyncIn = asyncIn; + } + + int c = 0; + + public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { + try { + if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) { + // If we are closed/closing, set immediate fail + promise.setFailure(new IllegalStateException("Channel closed")); + } else { + lastWriteFuture = asyncIn.write(toBuffer(msg)); + lastWriteFuture.addListener(new SshFutureListener() { + + @Override + public void operationComplete(final IoWriteFuture future) { + ((ByteBuf) msg).release(); + + // Notify success or failure + if (future.isWritten()) { + promise.setSuccess(); + } else { + promise.setFailure(future.getException()); + } + + // Reset last pending future + synchronized (SshWriteAsyncHandler.this) { + lastWriteFuture = null; + } + } + }); + } + } catch (final WritePendingException e) { + // Check limit for pending writes + pendingWriteCounter++; + if(pendingWriteCounter > MAX_PENDING_WRITES) { + promise.setFailure(e); + handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() + + ", remote window is not getting read or is too small")); + } + + // We need to reset buffer read index, since we've already read it when we tried to write it the first time + ((ByteBuf) msg).resetReaderIndex(); + logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter); + + // In case of pending, re-invoke write after pending is finished + Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e); + lastWriteFuture.addListener(new SshFutureListener() { + @Override + public void operationComplete(final IoWriteFuture future) { + // FIXME possible minor race condition, we cannot guarantee that this callback when pending is finished will be executed first + // External thread could trigger write on this instance while we are on this line + // Verify + if (future.isWritten()) { + synchronized (SshWriteAsyncHandler.this) { + // Pending done, decrease counter + pendingWriteCounter--; + write(ctx, msg, promise); + } + } else { + // Cannot reschedule pending, fail + handlePendingFailed(ctx, e); + } + } + + }); + } + } + + private void handlePendingFailed(final ChannelHandlerContext ctx, final Exception e) { + logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e); + try { + channelHandler.disconnect(ctx, ctx.newPromise()); + } catch (final Exception ex) { + // This should not happen + throw new IllegalStateException(ex); + } + } + + @Override + public void close() { + asyncIn = null; + } + + private Buffer toBuffer(final Object msg) { + // TODO Buffer vs ByteBuf translate, Can we handle that better ? + Preconditions.checkState(msg instanceof ByteBuf); + final ByteBuf byteBuf = (ByteBuf) msg; + final byte[] temp = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(temp, 0, byteBuf.readableBytes()); + return new Buffer(temp); + } + + } }