package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
import com.google.common.base.Preconditions;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import java.io.IOException;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
-import org.apache.sshd.common.io.IoInputStream;
-import org.apache.sshd.common.io.IoOutputStream;
-import org.apache.sshd.common.io.IoReadFuture;
-import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.WritePendingException;
-import org.apache.sshd.common.util.Buffer;
import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final AuthenticationHandler authenticationHandler;
private final SshClient sshClient;
- private SshReadAsyncListener sshReadAsyncListener;
- private SshWriteAsyncHandler sshWriteAsyncHandler;
+ private AsyncSshHanderReader sshReadAsyncListener;
+ private AsyncSshHandlerWriter sshWriteAsyncHandler;
private ClientChannel channel;
private ClientSession session;
connectPromise.setSuccess();
connectPromise = null;
- sshReadAsyncListener = new SshReadAsyncListener(ctx, channel.getAsyncOut());
- sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
-
- ctx.fireChannelActive();
+ sshReadAsyncListener = new AsyncSshHanderReader(this, ctx, channel.getAsyncOut());
+ // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
+ if(channel != null) {
+ sshWriteAsyncHandler = new AsyncSshHandlerWriter(channel.getAsyncIn());
+ ctx.fireChannelActive();
+ }
}
private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
sshWriteAsyncHandler.write(ctx, msg, promise);
}
- private static void handleSshSessionClosed(final ChannelHandlerContext ctx) {
- logger.debug("SSH session closed on channel: {}", ctx.channel());
- ctx.fireChannelInactive();
- }
-
@Override
public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
this.connectPromise = promise;
channel = null;
promise.setSuccess();
- handleSshSessionClosed(ctx);
- }
-
- /**
- * Listener over async input stream from SSH session.
- * This listeners schedules reads in a loop until the session is closed or read fails.
- */
- private static class SshReadAsyncListener implements SshFutureListener<IoReadFuture>, AutoCloseable {
- private static final int BUFFER_SIZE = 8192;
-
- private final ChannelHandlerContext ctx;
-
- private IoInputStream asyncOut;
- private Buffer buf;
- private IoReadFuture currentReadFuture;
-
- public SshReadAsyncListener(final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
- this.ctx = ctx;
- this.asyncOut = asyncOut;
- buf = new Buffer(BUFFER_SIZE);
- asyncOut.read(buf).addListener(this);
- }
-
- @Override
- public synchronized void operationComplete(final IoReadFuture future) {
- if(future.getException() != null) {
-
- if(asyncOut.isClosed() || asyncOut.isClosing()) {
- // We are closing
- handleSshSessionClosed(ctx);
- } else {
- logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
- throw new IllegalStateException("Exception while reading from SSH remote on channel " + ctx.channel(), future.getException());
- }
- }
-
- if (future.getRead() > 0) {
- ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead()));
-
- // Schedule next read
- buf = new Buffer(BUFFER_SIZE);
- currentReadFuture = asyncOut.read(buf);
- currentReadFuture.addListener(this);
- }
- }
-
- @Override
- public synchronized void close() {
- // Remove self as listener on close to prevent reading from closed input
- if(currentReadFuture != null) {
- currentReadFuture.removeListener(this);
- }
-
- asyncOut = null;
- }
+ logger.debug("SSH session closed on channel: {}", ctx.channel());
+ ctx.fireChannelInactive();
}
- private static final class SshWriteAsyncHandler implements AutoCloseable {
- public static final int MAX_PENDING_WRITES = 100;
-
- private final ChannelOutboundHandler channelHandler;
- private IoOutputStream asyncIn;
-
- // Counter that holds the amount of pending write messages
- // Pending write can occur in case remote window is full
- // In such case, we need to wait for the pending write to finish
- private int pendingWriteCounter;
- // Last write future, that can be pending
- private IoWriteFuture lastWriteFuture;
-
- public SshWriteAsyncHandler(final ChannelOutboundHandler channelHandler, final IoOutputStream asyncIn) {
- this.channelHandler = channelHandler;
- this.asyncIn = asyncIn;
- }
-
- public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
- try {
- if(asyncIn.isClosed() || asyncIn.isClosing()) {
- handleSshSessionClosed(ctx);
- } else {
- lastWriteFuture = asyncIn.write(toBuffer(msg));
- lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
-
- @Override
- public void operationComplete(final IoWriteFuture future) {
- ((ByteBuf) msg).release();
-
- // Notify success or failure
- if (future.isWritten()) {
- promise.setSuccess();
- }
- promise.setFailure(future.getException());
-
- // Reset last pending future
- synchronized (SshWriteAsyncHandler.this) {
- lastWriteFuture = null;
- }
- }
- });
- }
- } catch (final WritePendingException e) {
- // Check limit for pending writes
- pendingWriteCounter++;
- if(pendingWriteCounter > MAX_PENDING_WRITES) {
- handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
- ", remote window is not getting read or is too small"));
- }
-
- logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
-
- // In case of pending, re-invoke write after pending is finished
- lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
- @Override
- public void operationComplete(final IoWriteFuture future) {
- if(future.isWritten()) {
- synchronized (SshWriteAsyncHandler.this) {
- // Pending done, decrease counter
- pendingWriteCounter--;
- }
- write(ctx, msg, promise);
- } else {
- // Cannot reschedule pending, fail
- handlePendingFailed(ctx, e);
- }
- }
-
- });
- }
- }
-
- private void handlePendingFailed(final ChannelHandlerContext ctx, final Exception e) {
- logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e);
- try {
- channelHandler.disconnect(ctx, ctx.newPromise());
- } catch (final Exception ex) {
- // This should not happen
- throw new IllegalStateException(ex);
- }
- }
-
- @Override
- public void close() {
- asyncIn = null;
- }
-
- private Buffer toBuffer(final Object msg) {
- // TODO Buffer vs ByteBuf translate, Can we handle that better ?
- Preconditions.checkState(msg instanceof ByteBuf);
- final ByteBuf byteBuf = (ByteBuf) msg;
- final byte[] temp = new byte[byteBuf.readableBytes()];
- byteBuf.readBytes(temp, 0, byteBuf.readableBytes());
- return new Buffer(temp);
- }
-
- }
}