Merge "Bug 1025: Fixed incorrect revision in sal-remote-augment, which caused log...
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandler.java
index 369c013832790eef19dc2b751baa6a9564bb7800..3bd72320232bb7f912a316469b816fa76952f0c4 100644 (file)
@@ -9,10 +9,7 @@
 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
 
 import com.google.common.base.Preconditions;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandler;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
 import java.io.IOException;
@@ -25,12 +22,6 @@ import org.apache.sshd.client.future.ConnectFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
-import org.apache.sshd.common.io.IoInputStream;
-import org.apache.sshd.common.io.IoOutputStream;
-import org.apache.sshd.common.io.IoReadFuture;
-import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.WritePendingException;
-import org.apache.sshd.common.util.Buffer;
 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,8 +47,8 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     private final AuthenticationHandler authenticationHandler;
     private final SshClient sshClient;
 
-    private SshReadAsyncListener sshReadAsyncListener;
-    private SshWriteAsyncHandler sshWriteAsyncHandler;
+    private AsyncSshHanderReader sshReadAsyncListener;
+    private AsyncSshHandlerWriter sshWriteAsyncHandler;
 
     private ClientChannel channel;
     private ClientSession session;
@@ -147,10 +138,10 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         connectPromise.setSuccess();
         connectPromise = null;
 
-        sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
+        sshReadAsyncListener = new AsyncSshHanderReader(this, ctx, channel.getAsyncOut());
         // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
         if(channel != null) {
-            sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
+            sshWriteAsyncHandler = new AsyncSshHandlerWriter(channel.getAsyncIn());
             ctx.fireChannelActive();
         }
     }
@@ -207,173 +198,4 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         ctx.fireChannelInactive();
     }
 
-    /**
-     * Listener over async input stream from SSH session.
-     * This listeners schedules reads in a loop until the session is closed or read fails.
-     */
-    private static class SshReadAsyncListener implements SshFutureListener<IoReadFuture>, AutoCloseable {
-        private static final int BUFFER_SIZE = 8192;
-
-        private final ChannelOutboundHandler asyncSshHandler;
-        private final ChannelHandlerContext ctx;
-
-        private IoInputStream asyncOut;
-        private Buffer buf;
-        private IoReadFuture currentReadFuture;
-
-        public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
-            this.asyncSshHandler = asyncSshHandler;
-            this.ctx = ctx;
-            this.asyncOut = asyncOut;
-            buf = new Buffer(BUFFER_SIZE);
-            asyncOut.read(buf).addListener(this);
-        }
-
-        @Override
-        public synchronized void operationComplete(final IoReadFuture future) {
-            if(future.getException() != null) {
-                if(asyncOut.isClosed() || asyncOut.isClosing()) {
-                    // Ssh dropped
-                    logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
-                } else {
-                    logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
-                }
-                invokeDisconnect();
-                return;
-            }
-
-            if (future.getRead() > 0) {
-                ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead()));
-
-                // Schedule next read
-                buf = new Buffer(BUFFER_SIZE);
-                currentReadFuture = asyncOut.read(buf);
-                currentReadFuture.addListener(this);
-            }
-        }
-
-        private void invokeDisconnect() {
-            try {
-                asyncSshHandler.disconnect(ctx, ctx.newPromise());
-            } catch (final Exception e) {
-                // This should not happen
-                throw new IllegalStateException(e);
-            }
-        }
-
-        @Override
-        public synchronized void close() {
-            // Remove self as listener on close to prevent reading from closed input
-            if(currentReadFuture != null) {
-                currentReadFuture.removeListener(this);
-            }
-
-            asyncOut = null;
-        }
-    }
-
-    private static final class SshWriteAsyncHandler implements AutoCloseable {
-        public static final int MAX_PENDING_WRITES = 100;
-
-        private final ChannelOutboundHandler channelHandler;
-        private IoOutputStream asyncIn;
-
-        // Counter that holds the amount of pending write messages
-        // Pending write can occur in case remote window is full
-        // In such case, we need to wait for the pending write to finish
-        private int pendingWriteCounter;
-        // Last write future, that can be pending
-        private IoWriteFuture lastWriteFuture;
-
-        public SshWriteAsyncHandler(final ChannelOutboundHandler channelHandler, final IoOutputStream asyncIn) {
-            this.channelHandler = channelHandler;
-            this.asyncIn = asyncIn;
-        }
-
-        int c = 0;
-
-        public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
-            try {
-                if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
-                    // If we are closed/closing, set immediate fail
-                    promise.setFailure(new IllegalStateException("Channel closed"));
-                } else {
-                    lastWriteFuture = asyncIn.write(toBuffer(msg));
-                    lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
-
-                        @Override
-                        public void operationComplete(final IoWriteFuture future) {
-                            ((ByteBuf) msg).release();
-
-                            // Notify success or failure
-                            if (future.isWritten()) {
-                                promise.setSuccess();
-                            } else {
-                                promise.setFailure(future.getException());
-                            }
-
-                            // Reset last pending future
-                            synchronized (SshWriteAsyncHandler.this) {
-                                lastWriteFuture = null;
-                            }
-                        }
-                    });
-                }
-            } catch (final WritePendingException e) {
-                // Check limit for pending writes
-                pendingWriteCounter++;
-                if(pendingWriteCounter > MAX_PENDING_WRITES) {
-                    promise.setFailure(e);
-                    handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
-                            ", remote window is not getting read or is too small"));
-                }
-
-                logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
-
-                // In case of pending, re-invoke write after pending is finished
-                Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e);
-                lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
-                    @Override
-                    public void operationComplete(final IoWriteFuture future) {
-                        if (future.isWritten()) {
-                            synchronized (SshWriteAsyncHandler.this) {
-                                // Pending done, decrease counter
-                                pendingWriteCounter--;
-                            }
-                            write(ctx, msg, promise);
-                        } else {
-                            // Cannot reschedule pending, fail
-                            handlePendingFailed(ctx, e);
-                        }
-                    }
-
-                });
-            }
-        }
-
-        private void handlePendingFailed(final ChannelHandlerContext ctx, final Exception e) {
-            logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e);
-            try {
-                channelHandler.disconnect(ctx, ctx.newPromise());
-            } catch (final Exception ex) {
-                // This should not happen
-                throw new IllegalStateException(ex);
-            }
-        }
-
-        @Override
-        public void close() {
-            asyncIn = null;
-        }
-
-        private Buffer toBuffer(final Object msg) {
-            // TODO Buffer vs ByteBuf translate, Can we handle that better ?
-            Preconditions.checkState(msg instanceof ByteBuf);
-            final ByteBuf byteBuf = (ByteBuf) msg;
-            final byte[] temp = new byte[byteBuf.readableBytes()];
-            byteBuf.readBytes(temp, 0, byteBuf.readableBytes());
-            return new Buffer(temp);
-        }
-
-    }
 }