X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FAsyncSshHandlerWriter.java;h=667a4f6bf8dd23414986ba1de71598861155565e;hp=10941021bceaa1e12433ff5291e7d55741ad822a;hb=c448e270faf33afb6a04929a064ce9c9ce9556ed;hpb=cd50f92c60580b546a696aab7c3ff4fbf3f9a5f0 diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java index 10941021bc..667a4f6bf8 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java @@ -40,7 +40,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { // 2. At this level we might be dealing with Chunks of messages(not whole messages) and unexpected behavior might occur // when we send/queue 1 chunk and fail the other chunks - private IoOutputStream asyncIn; + private volatile IoOutputStream asyncIn; // Order has to be preserved for queued writes private final Deque pending = new LinkedList<>(); @@ -49,20 +49,30 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { this.asyncIn = asyncIn; } - public synchronized void write(final ChannelHandlerContext ctx, + public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { - // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here - // If we are closed/closing, set immediate fail - if (asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) { + if (asyncIn == null) { promise.setFailure(new IllegalStateException("Channel closed")); - } else { - final ByteBuf byteBufMsg = (ByteBuf) msg; - if (pending.isEmpty() == false) { - queueRequest(ctx, byteBufMsg, promise); - return; - } + return; + } + // synchronized block due to deadlock that happens on ssh window resize + // writes and pending writes would lock the underlyinch channel session + // window resize write would try to write the message on an already locked channelSession + // while the pending write was in progress from the write callback + synchronized (asyncIn) { + // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here + // If we are closed/closing, set immediate fail + if (asyncIn.isClosed() || asyncIn.isClosing()) { + promise.setFailure(new IllegalStateException("Channel closed")); + } else { + final ByteBuf byteBufMsg = (ByteBuf) msg; + if (pending.isEmpty() == false) { + queueRequest(ctx, byteBufMsg, promise); + return; + } - writeWithPendingDetection(ctx, promise, byteBufMsg, false); + writeWithPendingDetection(ctx, promise, byteBufMsg, false); + } } } @@ -78,23 +88,27 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { @Override public void operationComplete(final IoWriteFuture future) { - if (LOG.isTraceEnabled()) { - LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}", - ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg)); - } + // synchronized block due to deadlock that happens on ssh window resize + // writes and pending writes would lock the underlyinch channel session + // window resize write would try to write the message on an already locked channelSession, + // while the pending write was in progress from the write callback + synchronized (asyncIn) { + if (LOG.isTraceEnabled()) { + LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}", + ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg)); + } - // Notify success or failure - if (future.isWritten()) { - promise.setSuccess(); - } else { - LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException()); - promise.setFailure(future.getException()); - } + // Notify success or failure + if (future.isWritten()) { + promise.setSuccess(); + } else { + LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException()); + promise.setFailure(future.getException()); + } - // Not needed anymore, release - byteBufMsg.release(); + // Not needed anymore, release + byteBufMsg.release(); - synchronized (AsyncSshHandlerWriter.this) { //rescheduling message from queue after successfully sent if (wasPending) { byteBufMsg.resetReaderIndex(); @@ -105,7 +119,6 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { // Check pending queue and schedule next // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed writePendingIfAny(); - } }); @@ -117,19 +130,20 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { } } - private synchronized void writePendingIfAny() { + private void writePendingIfAny() { + synchronized (asyncIn) { + if (pending.peek() == null) { + return; + } - if (pending.peek() == null) { - return; - } + final PendingWriteRequest pendingWrite = pending.peek(); + final ByteBuf msg = pendingWrite.msg; + if (LOG.isTraceEnabled()) { + LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg)); + } - final PendingWriteRequest pendingWrite = pending.peek(); - final ByteBuf msg = pendingWrite.msg; - if (LOG.isTraceEnabled()) { - LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg)); + writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true); } - - writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true); } public static String byteBufToString(final ByteBuf msg) { @@ -153,7 +167,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { } @Override - public synchronized void close() { + public void close() { asyncIn = null; }