Fix another state-keeping thinko
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerReader.java
index c79b71308ace4d270c15d93564f91cd1877ec880..e6ba03155580090ad5e6fa336cbf2b287238ca69 100644 (file)
@@ -10,10 +10,11 @@ package org.opendaylight.netconf.nettyutil.handler.ssh.client;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import org.apache.sshd.common.future.SshFutureListener;
-import org.apache.sshd.common.io.IoInputStream;
-import org.apache.sshd.common.io.IoReadFuture;
-import org.apache.sshd.common.util.Buffer;
+import org.opendaylight.netconf.shaded.sshd.common.future.SshFutureListener;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoInputStream;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoReadFuture;
+import org.opendaylight.netconf.shaded.sshd.common.util.buffer.Buffer;
+import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,17 +42,22 @@ public final class AsyncSshHandlerReader implements SshFutureListener<IoReadFutu
         this.readHandler = readHandler;
         this.channelId = channelId;
         this.asyncOut = asyncOut;
-        buf = new Buffer(BUFFER_SIZE);
+        buf = new ByteArrayBuffer(BUFFER_SIZE);
         asyncOut.read(buf).addListener(this);
     }
 
     @Override
-    public synchronized void operationComplete(final IoReadFuture future) {
-        if (future.getException() != null) {
+    public void operationComplete(final IoReadFuture future) {
+        if (checkDisconnect(future)) {
+            invokeDisconnect();
+        }
+    }
 
+    private synchronized boolean checkDisconnect(final IoReadFuture future) {
+        if (future.getException() != null) {
             //if asyncout is already set to null by close method, do nothing
             if (asyncOut == null) {
-                return;
+                return false;
             }
 
             if (asyncOut.isClosed() || asyncOut.isClosing()) {
@@ -60,11 +66,8 @@ public final class AsyncSshHandlerReader implements SshFutureListener<IoReadFutu
             } else {
                 LOG.warn("Exception while reading from SSH remote on channel {}", channelId, future.getException());
             }
-            invokeDisconnect();
-            return;
-        }
-
-        if (future.getRead() > 0) {
+            return true;
+        } else if (future.getRead() > 0) {
             final ByteBuf msg = Unpooled.wrappedBuffer(buf.array(), 0, future.getRead());
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Reading message on channel: {}, message: {}",
@@ -73,12 +76,17 @@ public final class AsyncSshHandlerReader implements SshFutureListener<IoReadFutu
             readHandler.onMessageRead(msg);
 
             // Schedule next read
-            buf = new Buffer(BUFFER_SIZE);
+            buf = new ByteArrayBuffer(BUFFER_SIZE);
             currentReadFuture = asyncOut.read(buf);
             currentReadFuture.addListener(this);
         }
+        return false;
     }
 
+    /**
+     * Closing of the {@link AsyncSshHandlerReader}. This method should never be called with any locks held since
+     * call to {@link AutoCloseable#close()} can be a source of ABBA deadlock.
+     */
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void invokeDisconnect() {
         try {