import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.channel.ClientChannel;
import org.apache.sshd.client.future.AuthFuture;
private final AuthenticationHandler authenticationHandler;
private final SshClient sshClient;
+ private final AtomicBoolean isDisconnected = new AtomicBoolean();
private Future<?> negotiationFuture;
private AsyncSshHandlerReader sshReadAsyncListener;
disconnect(ctx, promise);
}
- @SuppressWarnings("checkstyle:IllegalCatch")
@Override
- public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
+ public void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
+ if (isDisconnected.compareAndSet(false, true)) {
+ safelyDisconnect(ctx, promise);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private synchronized void safelyDisconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
LOG.trace("Closing SSH session on channel: {} with connect promise in state: {}",
- ctx.channel(),connectPromise);
+ ctx.channel(), connectPromise);
// If we have already succeeded and the session was dropped after,
// we need to fire inactive to notify reconnect logic
promise.setSuccess();
LOG.debug("SSH session closed on channel: {}", ctx.channel());
}
-
}
}
@Override
- public synchronized void operationComplete(final IoReadFuture future) {
- if (future.getException() != null) {
+ public void operationComplete(final IoReadFuture future) {
+ if (checkDisconnect(future)) {
+ invokeDisconnect();
+ }
+ }
+ private synchronized boolean checkDisconnect(final IoReadFuture future) {
+ if (future.getException() != null) {
//if asyncout is already set to null by close method, do nothing
if (asyncOut == null) {
- return;
+ return false;
}
if (asyncOut.isClosed() || asyncOut.isClosing()) {
} else {
LOG.warn("Exception while reading from SSH remote on channel {}", channelId, future.getException());
}
- invokeDisconnect();
- return;
- }
-
- if (future.getRead() > 0) {
+ return true;
+ } else if (future.getRead() > 0) {
final ByteBuf msg = Unpooled.wrappedBuffer(buf.array(), 0, future.getRead());
if (LOG.isTraceEnabled()) {
LOG.trace("Reading message on channel: {}, message: {}",
currentReadFuture = asyncOut.read(buf);
currentReadFuture.addListener(this);
}
+ return false;
}
+ /**
+ * Closing of the {@link AsyncSshHandlerReader}. This method should never be called with any locks held since
+ * call to {@link AutoCloseable#close()} can be a source of ABBA deadlock.
+ */
@SuppressWarnings("checkstyle:IllegalCatch")
private void invokeDisconnect() {
try {