import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import java.io.IOException;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.io.IoReadFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.WritePendingException;
import org.apache.sshd.common.util.Buffer;
import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
import org.slf4j.Logger;
private final SshClient sshClient;
private SshReadAsyncListener sshReadAsyncListener;
+ private SshWriteAsyncHandler sshWriteAsyncHandler;
+
private ClientChannel channel;
private ClientSession session;
private ChannelPromise connectPromise;
+
public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler) throws IOException {
return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT);
}
connectPromise.setSuccess();
connectPromise = null;
- ctx.fireChannelActive();
- final IoInputStream asyncOut = channel.getAsyncOut();
- sshReadAsyncListener = new SshReadAsyncListener(ctx, asyncOut);
+ sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
+ // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
+ if(channel != null) {
+ sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
+ ctx.fireChannelActive();
+ }
}
private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
@Override
public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
- try {
- if(channel.getAsyncIn().isClosed() || channel.getAsyncIn().isClosing()) {
- handleSshSessionClosed(ctx);
- } else {
- channel.getAsyncIn().write(toBuffer(msg));
- ((ByteBuf) msg).release();
- }
- } catch (final Exception e) {
- logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e);
- throw new IllegalStateException("Exception while writing to SSH remote on channel " + ctx.channel(),e);
- }
- }
-
- private static void handleSshSessionClosed(final ChannelHandlerContext ctx) {
- logger.debug("SSH session closed on channel: {}", ctx.channel());
- ctx.fireChannelInactive();
- }
-
- private Buffer toBuffer(final Object msg) {
- // TODO Buffer vs ByteBuf translate, Can we handle that better ?
- Preconditions.checkState(msg instanceof ByteBuf);
- final ByteBuf byteBuf = (ByteBuf) msg;
- final byte[] temp = new byte[byteBuf.readableBytes()];
- byteBuf.readBytes(temp, 0, byteBuf.readableBytes());
- return new Buffer(temp);
+ sshWriteAsyncHandler.write(ctx, msg, promise);
}
@Override
}
@Override
- public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
+ public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
if(sshReadAsyncListener != null) {
sshReadAsyncListener.close();
}
- session.close(false).addListener(new SshFutureListener<CloseFuture>() {
- @Override
- public void operationComplete(final CloseFuture future) {
- if(future.isClosed() == false) {
- session.close(true);
+ if(sshWriteAsyncHandler != null) {
+ sshWriteAsyncHandler.close();
+ }
+
+ if(session!= null && !session.isClosed() && !session.isClosing()) {
+ session.close(false).addListener(new SshFutureListener<CloseFuture>() {
+ @Override
+ public void operationComplete(final CloseFuture future) {
+ if (future.isClosed() == false) {
+ session.close(true);
+ }
+ session = null;
}
- session = null;
- }
- });
+ });
+ }
channel = null;
+ promise.setSuccess();
+
+ logger.debug("SSH session closed on channel: {}", ctx.channel());
+ ctx.fireChannelInactive();
}
/**
private static class SshReadAsyncListener implements SshFutureListener<IoReadFuture>, AutoCloseable {
private static final int BUFFER_SIZE = 8192;
+ private final ChannelOutboundHandler asyncSshHandler;
private final ChannelHandlerContext ctx;
private IoInputStream asyncOut;
private Buffer buf;
private IoReadFuture currentReadFuture;
- public SshReadAsyncListener(final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+ public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+ this.asyncSshHandler = asyncSshHandler;
this.ctx = ctx;
this.asyncOut = asyncOut;
buf = new Buffer(BUFFER_SIZE);
@Override
public synchronized void operationComplete(final IoReadFuture future) {
if(future.getException() != null) {
-
if(asyncOut.isClosed() || asyncOut.isClosing()) {
- // We are closing
- handleSshSessionClosed(ctx);
+ // Ssh dropped
+ logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
} else {
logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
- throw new IllegalStateException("Exception while reading from SSH remote on channel " + ctx.channel(), future.getException());
}
+ invokeDisconnect();
+ return;
}
if (future.getRead() > 0) {
}
}
+ private void invokeDisconnect() {
+ try {
+ asyncSshHandler.disconnect(ctx, ctx.newPromise());
+ } catch (final Exception e) {
+ // This should not happen
+ throw new IllegalStateException(e);
+ }
+ }
+
@Override
- public synchronized void close() throws Exception {
+ public synchronized void close() {
// Remove self as listener on close to prevent reading from closed input
if(currentReadFuture != null) {
currentReadFuture.removeListener(this);
asyncOut = null;
}
}
+
+ private static final class SshWriteAsyncHandler implements AutoCloseable {
+ public static final int MAX_PENDING_WRITES = 100;
+
+ private final ChannelOutboundHandler channelHandler;
+ private IoOutputStream asyncIn;
+
+ // Counter that holds the amount of pending write messages
+ // Pending write can occur in case remote window is full
+ // In such case, we need to wait for the pending write to finish
+ private int pendingWriteCounter;
+ // Last write future, that can be pending
+ private IoWriteFuture lastWriteFuture;
+
+ public SshWriteAsyncHandler(final ChannelOutboundHandler channelHandler, final IoOutputStream asyncIn) {
+ this.channelHandler = channelHandler;
+ this.asyncIn = asyncIn;
+ }
+
+ int c = 0;
+
+ public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
+ try {
+ if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
+ // If we are closed/closing, set immediate fail
+ promise.setFailure(new IllegalStateException("Channel closed"));
+ } else {
+ lastWriteFuture = asyncIn.write(toBuffer(msg));
+ lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
+
+ @Override
+ public void operationComplete(final IoWriteFuture future) {
+ ((ByteBuf) msg).release();
+
+ // Notify success or failure
+ if (future.isWritten()) {
+ promise.setSuccess();
+ } else {
+ promise.setFailure(future.getException());
+ }
+
+ // Reset last pending future
+ synchronized (SshWriteAsyncHandler.this) {
+ lastWriteFuture = null;
+ }
+ }
+ });
+ }
+ } catch (final WritePendingException e) {
+ // Check limit for pending writes
+ pendingWriteCounter++;
+ if(pendingWriteCounter > MAX_PENDING_WRITES) {
+ promise.setFailure(e);
+ handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
+ ", remote window is not getting read or is too small"));
+ }
+
+ logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
+
+ // In case of pending, re-invoke write after pending is finished
+ Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e);
+ lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
+ @Override
+ public void operationComplete(final IoWriteFuture future) {
+ if (future.isWritten()) {
+ synchronized (SshWriteAsyncHandler.this) {
+ // Pending done, decrease counter
+ pendingWriteCounter--;
+ }
+ write(ctx, msg, promise);
+ } else {
+ // Cannot reschedule pending, fail
+ handlePendingFailed(ctx, e);
+ }
+ }
+
+ });
+ }
+ }
+
+ private void handlePendingFailed(final ChannelHandlerContext ctx, final Exception e) {
+ logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e);
+ try {
+ channelHandler.disconnect(ctx, ctx.newPromise());
+ } catch (final Exception ex) {
+ // This should not happen
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ @Override
+ public void close() {
+ asyncIn = null;
+ }
+
+ private Buffer toBuffer(final Object msg) {
+ // TODO Buffer vs ByteBuf translate, Can we handle that better ?
+ Preconditions.checkState(msg instanceof ByteBuf);
+ final ByteBuf byteBuf = (ByteBuf) msg;
+ final byte[] temp = new byte[byteBuf.readableBytes()];
+ byteBuf.readBytes(temp, 0, byteBuf.readableBytes());
+ return new Buffer(temp);
+ }
+
+ }
}