X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FAsyncSshHandlerWriter.java;h=667a4f6bf8dd23414986ba1de71598861155565e;hp=757dc1a2ba334d4169718e751a553d309e6240f6;hb=c448e270faf33afb6a04929a064ce9c9ce9556ed;hpb=e8ee8a8897013fb90376e21ccd9fc54a98a8c251 diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java index 757dc1a2ba..667a4f6bf8 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java @@ -40,7 +40,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { // 2. At this level we might be dealing with Chunks of messages(not whole messages) and unexpected behavior might occur // when we send/queue 1 chunk and fail the other chunks - private IoOutputStream asyncIn; + private volatile IoOutputStream asyncIn; // Order has to be preserved for queued writes private final Deque pending = new LinkedList<>(); @@ -49,26 +49,36 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { this.asyncIn = asyncIn; } - public synchronized void write(final ChannelHandlerContext ctx, + public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { - // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here - // If we are closed/closing, set immediate fail - if (asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) { + if (asyncIn == null) { promise.setFailure(new IllegalStateException("Channel closed")); - } else { - final ByteBuf byteBufMsg = (ByteBuf) msg; - if (pending.isEmpty() == false) { - queueRequest(ctx, byteBufMsg, promise); - return; - } + return; + } + // synchronized block due to deadlock that happens on ssh window resize + // writes and pending writes would lock the underlyinch channel session + // window resize write would try to write the message on an already locked channelSession + // while the pending write was in progress from the write callback + synchronized (asyncIn) { + // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here + // If we are closed/closing, set immediate fail + if (asyncIn.isClosed() || asyncIn.isClosing()) { + promise.setFailure(new IllegalStateException("Channel closed")); + } else { + final ByteBuf byteBufMsg = (ByteBuf) msg; + if (pending.isEmpty() == false) { + queueRequest(ctx, byteBufMsg, promise); + return; + } - writeWithPendingDetection(ctx, promise, byteBufMsg, false); + writeWithPendingDetection(ctx, promise, byteBufMsg, false); + } } } //sending message with pending //if resending message not succesfull, then attribute wasPending is true - private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, boolean wasPending) { + private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, final boolean wasPending) { try { if (LOG.isTraceEnabled()) { @@ -78,35 +88,40 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { @Override public void operationComplete(final IoWriteFuture future) { - if (LOG.isTraceEnabled()) { - LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}", - ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg)); - } - - // Notify success or failure - if (future.isWritten()) { - promise.setSuccess(); - } else { - LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException()); - promise.setFailure(future.getException()); + // synchronized block due to deadlock that happens on ssh window resize + // writes and pending writes would lock the underlyinch channel session + // window resize write would try to write the message on an already locked channelSession, + // while the pending write was in progress from the write callback + synchronized (asyncIn) { + if (LOG.isTraceEnabled()) { + LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}", + ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg)); + } + + // Notify success or failure + if (future.isWritten()) { + promise.setSuccess(); + } else { + LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException()); + promise.setFailure(future.getException()); + } + + // Not needed anymore, release + byteBufMsg.release(); + + //rescheduling message from queue after successfully sent + if (wasPending) { + byteBufMsg.resetReaderIndex(); + pending.remove(); + } } - // Not needed anymore, release - byteBufMsg.release(); - // Check pending queue and schedule next // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed writePendingIfAny(); - } }); - //rescheduling message from queue after successfully sent - if(wasPending){ - byteBufMsg.resetReaderIndex(); - pending.remove(); - } - } catch (final WritePendingException e) { if(wasPending == false){ @@ -115,19 +130,20 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { } } - private synchronized void writePendingIfAny() { + private void writePendingIfAny() { + synchronized (asyncIn) { + if (pending.peek() == null) { + return; + } - if (pending.peek() == null) { - return; - } + final PendingWriteRequest pendingWrite = pending.peek(); + final ByteBuf msg = pendingWrite.msg; + if (LOG.isTraceEnabled()) { + LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg)); + } - final PendingWriteRequest pendingWrite = pending.peek(); - final ByteBuf msg = pendingWrite.msg; - if (LOG.isTraceEnabled()) { - LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg)); + writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true); } - - writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true); } public static String byteBufToString(final ByteBuf msg) { @@ -151,7 +167,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { } @Override - public synchronized void close() { + public void close() { asyncIn = null; }