X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FAsyncSshHandler.java;h=935cb8dcd06ca966e6c560560d2030150cced460;hb=76b553837d30cd4c9d423251d9c4b9e218e53f6a;hp=2761a45d03bedc8730cb69036748c266cdc7f412;hpb=91eec932865a8b38e9ab33b3fa566f1a0a9534e9;p=controller.git diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java index 2761a45d03..935cb8dcd0 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java @@ -12,6 +12,7 @@ import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import java.io.IOException; @@ -25,7 +26,10 @@ import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.SshFutureListener; import org.apache.sshd.common.io.IoInputStream; +import org.apache.sshd.common.io.IoOutputStream; import org.apache.sshd.common.io.IoReadFuture; +import org.apache.sshd.common.io.IoWriteFuture; +import org.apache.sshd.common.io.WritePendingException; import org.apache.sshd.common.util.Buffer; import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler; import org.slf4j.Logger; @@ -53,10 +57,13 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private final SshClient sshClient; private SshReadAsyncListener sshReadAsyncListener; + private SshWriteAsyncHandler sshWriteAsyncHandler; + private ClientChannel channel; private ClientSession session; private ChannelPromise connectPromise; + public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler) throws IOException { return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT); } @@ -139,10 +146,11 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { connectPromise.setSuccess(); connectPromise = null; - ctx.fireChannelActive(); - final IoInputStream asyncOut = channel.getAsyncOut(); - sshReadAsyncListener = new SshReadAsyncListener(ctx, asyncOut); + sshReadAsyncListener = new SshReadAsyncListener(ctx, channel.getAsyncOut()); + sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn()); + + ctx.fireChannelActive(); } private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) { @@ -154,17 +162,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { @Override public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { - try { - if(channel.getAsyncIn().isClosed() || channel.getAsyncIn().isClosing()) { - handleSshSessionClosed(ctx); - } else { - channel.getAsyncIn().write(toBuffer(msg)); - ((ByteBuf) msg).release(); - } - } catch (final Exception e) { - logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e); - throw new IllegalStateException("Exception while writing to SSH remote on channel " + ctx.channel(),e); - } + sshWriteAsyncHandler.write(ctx, msg, promise); } private static void handleSshSessionClosed(final ChannelHandlerContext ctx) { @@ -172,15 +170,6 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { ctx.fireChannelInactive(); } - private Buffer toBuffer(final Object msg) { - // TODO Buffer vs ByteBuf translate, Can we handle that better ? - Preconditions.checkState(msg instanceof ByteBuf); - final ByteBuf byteBuf = (ByteBuf) msg; - final byte[] temp = new byte[byteBuf.readableBytes()]; - byteBuf.readBytes(temp, 0, byteBuf.readableBytes()); - return new Buffer(temp); - } - @Override public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception { this.connectPromise = promise; @@ -193,22 +182,31 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { } @Override - public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception { + public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) { if(sshReadAsyncListener != null) { sshReadAsyncListener.close(); } - session.close(false).addListener(new SshFutureListener() { - @Override - public void operationComplete(final CloseFuture future) { - if(future.isClosed() == false) { - session.close(true); + if(sshWriteAsyncHandler != null) { + sshWriteAsyncHandler.close(); + } + + if(session!= null && !session.isClosed() && !session.isClosing()) { + session.close(false).addListener(new SshFutureListener() { + @Override + public void operationComplete(final CloseFuture future) { + if (future.isClosed() == false) { + session.close(true); + } + session = null; } - session = null; - } - }); + }); + } channel = null; + promise.setSuccess(); + + handleSshSessionClosed(ctx); } /** @@ -255,7 +253,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { } @Override - public synchronized void close() throws Exception { + public synchronized void close() { // Remove self as listener on close to prevent reading from closed input if(currentReadFuture != null) { currentReadFuture.removeListener(this); @@ -264,4 +262,103 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { asyncOut = null; } } + + private static final class SshWriteAsyncHandler implements AutoCloseable { + public static final int MAX_PENDING_WRITES = 100; + + private final ChannelOutboundHandler channelHandler; + private IoOutputStream asyncIn; + + // Counter that holds the amount of pending write messages + // Pending write can occur in case remote window is full + // In such case, we need to wait for the pending write to finish + private int pendingWriteCounter; + // Last write future, that can be pending + private IoWriteFuture lastWriteFuture; + + public SshWriteAsyncHandler(final ChannelOutboundHandler channelHandler, final IoOutputStream asyncIn) { + this.channelHandler = channelHandler; + this.asyncIn = asyncIn; + } + + public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { + try { + if(asyncIn.isClosed() || asyncIn.isClosing()) { + handleSshSessionClosed(ctx); + } else { + lastWriteFuture = asyncIn.write(toBuffer(msg)); + lastWriteFuture.addListener(new SshFutureListener() { + + @Override + public void operationComplete(final IoWriteFuture future) { + ((ByteBuf) msg).release(); + + // Notify success or failure + if (future.isWritten()) { + promise.setSuccess(); + } + promise.setFailure(future.getException()); + + // Reset last pending future + synchronized (SshWriteAsyncHandler.this) { + lastWriteFuture = null; + } + } + }); + } + } catch (final WritePendingException e) { + // Check limit for pending writes + pendingWriteCounter++; + if(pendingWriteCounter > MAX_PENDING_WRITES) { + handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() + + ", remote window is not getting read or is too small")); + } + + logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter); + + // In case of pending, re-invoke write after pending is finished + lastWriteFuture.addListener(new SshFutureListener() { + @Override + public void operationComplete(final IoWriteFuture future) { + if(future.isWritten()) { + synchronized (SshWriteAsyncHandler.this) { + // Pending done, decrease counter + pendingWriteCounter--; + } + write(ctx, msg, promise); + } else { + // Cannot reschedule pending, fail + handlePendingFailed(ctx, e); + } + } + + }); + } + } + + private void handlePendingFailed(final ChannelHandlerContext ctx, final Exception e) { + logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e); + try { + channelHandler.disconnect(ctx, ctx.newPromise()); + } catch (final Exception ex) { + // This should not happen + throw new IllegalStateException(ex); + } + } + + @Override + public void close() { + asyncIn = null; + } + + private Buffer toBuffer(final Object msg) { + // TODO Buffer vs ByteBuf translate, Can we handle that better ? + Preconditions.checkState(msg instanceof ByteBuf); + final ByteBuf byteBuf = (ByteBuf) msg; + final byte[] temp = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(temp, 0, byteBuf.readableBytes()); + return new Buffer(temp); + } + + } }