Fix another state-keeping thinko
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerReader.java
index a6da457153b6ea98321da9f220cdecceed705cea..e6ba03155580090ad5e6fa336cbf2b287238ca69 100644 (file)
@@ -10,10 +10,11 @@ package org.opendaylight.netconf.nettyutil.handler.ssh.client;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import org.apache.sshd.common.future.SshFutureListener;
-import org.apache.sshd.common.io.IoInputStream;
-import org.apache.sshd.common.io.IoReadFuture;
-import org.apache.sshd.common.util.Buffer;
+import org.opendaylight.netconf.shaded.sshd.common.future.SshFutureListener;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoInputStream;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoReadFuture;
+import org.opendaylight.netconf.shaded.sshd.common.util.buffer.Buffer;
+import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,7 +26,7 @@ public final class AsyncSshHandlerReader implements SshFutureListener<IoReadFutu
 
     private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandlerReader.class);
 
-    private static final int BUFFER_SIZE = 8192;
+    private static final int BUFFER_SIZE = 2048;
 
     private final AutoCloseable connectionClosedCallback;
     private final ReadMsgHandler readHandler;
@@ -35,48 +36,58 @@ public final class AsyncSshHandlerReader implements SshFutureListener<IoReadFutu
     private Buffer buf;
     private IoReadFuture currentReadFuture;
 
-    public AsyncSshHandlerReader(final AutoCloseable connectionClosedCallback, final ReadMsgHandler readHandler, final String channelId, final IoInputStream asyncOut) {
+    public AsyncSshHandlerReader(final AutoCloseable connectionClosedCallback, final ReadMsgHandler readHandler,
+                                 final String channelId, final IoInputStream asyncOut) {
         this.connectionClosedCallback = connectionClosedCallback;
         this.readHandler = readHandler;
         this.channelId = channelId;
         this.asyncOut = asyncOut;
-        buf = new Buffer(BUFFER_SIZE);
+        buf = new ByteArrayBuffer(BUFFER_SIZE);
         asyncOut.read(buf).addListener(this);
     }
 
     @Override
-    public synchronized void operationComplete(final IoReadFuture future) {
-        if(future.getException() != null) {
+    public void operationComplete(final IoReadFuture future) {
+        if (checkDisconnect(future)) {
+            invokeDisconnect();
+        }
+    }
 
+    private synchronized boolean checkDisconnect(final IoReadFuture future) {
+        if (future.getException() != null) {
             //if asyncout is already set to null by close method, do nothing
-            if(asyncOut == null) {
-                return;
+            if (asyncOut == null) {
+                return false;
             }
 
-            if(asyncOut.isClosed() || asyncOut.isClosing()) {
+            if (asyncOut.isClosed() || asyncOut.isClosing()) {
                 // Ssh dropped
                 LOG.debug("Ssh session dropped on channel: {}", channelId, future.getException());
             } else {
                 LOG.warn("Exception while reading from SSH remote on channel {}", channelId, future.getException());
             }
-            invokeDisconnect();
-            return;
-        }
-
-        if (future.getRead() > 0) {
+            return true;
+        } else if (future.getRead() > 0) {
             final ByteBuf msg = Unpooled.wrappedBuffer(buf.array(), 0, future.getRead());
-            if(LOG.isTraceEnabled()) {
-                LOG.trace("Reading message on channel: {}, message: {}", channelId, AsyncSshHandlerWriter.byteBufToString(msg));
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Reading message on channel: {}, message: {}",
+                        channelId, AsyncSshHandlerWriter.byteBufToString(msg));
             }
             readHandler.onMessageRead(msg);
 
             // Schedule next read
-            buf = new Buffer(BUFFER_SIZE);
+            buf = new ByteArrayBuffer(BUFFER_SIZE);
             currentReadFuture = asyncOut.read(buf);
             currentReadFuture.addListener(this);
         }
+        return false;
     }
 
+    /**
+     * Closing of the {@link AsyncSshHandlerReader}. This method should never be called with any locks held since
+     * call to {@link AutoCloseable#close()} can be a source of ABBA deadlock.
+     */
+    @SuppressWarnings("checkstyle:IllegalCatch")
     private void invokeDisconnect() {
         try {
             connectionClosedCallback.close();
@@ -89,7 +100,7 @@ public final class AsyncSshHandlerReader implements SshFutureListener<IoReadFutu
     @Override
     public synchronized void close() {
         // Remove self as listener on close to prevent reading from closed input
-        if(currentReadFuture != null) {
+        if (currentReadFuture != null) {
             currentReadFuture.removeListener(this);
             currentReadFuture = null;
         }