X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FAsyncSshHandlerWriter.java;h=667a4f6bf8dd23414986ba1de71598861155565e;hp=1e976ce6a1394c31e0bbd8dd8746463193a3ac5c;hb=c448e270faf33afb6a04929a064ce9c9ce9556ed;hpb=a2563a94253f9c2603e0ab25b8f412ea07fcf51d diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java index 1e976ce6a1..667a4f6bf8 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java @@ -40,7 +40,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { // 2. At this level we might be dealing with Chunks of messages(not whole messages) and unexpected behavior might occur // when we send/queue 1 chunk and fail the other chunks - private IoOutputStream asyncIn; + private volatile IoOutputStream asyncIn; // Order has to be preserved for queued writes private final Deque pending = new LinkedList<>(); @@ -49,25 +49,38 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { this.asyncIn = asyncIn; } - public synchronized void write(final ChannelHandlerContext ctx, + public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { - // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here - // If we are closed/closing, set immediate fail - if (asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) { + if (asyncIn == null) { promise.setFailure(new IllegalStateException("Channel closed")); - } else { - final ByteBuf byteBufMsg = (ByteBuf) msg; - if (pending.isEmpty() == false) { - queueRequest(ctx, byteBufMsg, promise); - return; - } + return; + } + // synchronized block due to deadlock that happens on ssh window resize + // writes and pending writes would lock the underlyinch channel session + // window resize write would try to write the message on an already locked channelSession + // while the pending write was in progress from the write callback + synchronized (asyncIn) { + // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here + // If we are closed/closing, set immediate fail + if (asyncIn.isClosed() || asyncIn.isClosing()) { + promise.setFailure(new IllegalStateException("Channel closed")); + } else { + final ByteBuf byteBufMsg = (ByteBuf) msg; + if (pending.isEmpty() == false) { + queueRequest(ctx, byteBufMsg, promise); + return; + } - writeWithPendingDetection(ctx, promise, byteBufMsg); + writeWithPendingDetection(ctx, promise, byteBufMsg, false); + } } } - private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg) { + //sending message with pending + //if resending message not succesfull, then attribute wasPending is true + private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, final boolean wasPending) { try { + if (LOG.isTraceEnabled()) { LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg)); } @@ -75,49 +88,65 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { @Override public void operationComplete(final IoWriteFuture future) { - if (LOG.isTraceEnabled()) { - LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}", - ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg)); - } - - // Notify success or failure - if (future.isWritten()) { - promise.setSuccess(); - } else { - LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException()); - promise.setFailure(future.getException()); + // synchronized block due to deadlock that happens on ssh window resize + // writes and pending writes would lock the underlyinch channel session + // window resize write would try to write the message on an already locked channelSession, + // while the pending write was in progress from the write callback + synchronized (asyncIn) { + if (LOG.isTraceEnabled()) { + LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}", + ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg)); + } + + // Notify success or failure + if (future.isWritten()) { + promise.setSuccess(); + } else { + LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException()); + promise.setFailure(future.getException()); + } + + // Not needed anymore, release + byteBufMsg.release(); + + //rescheduling message from queue after successfully sent + if (wasPending) { + byteBufMsg.resetReaderIndex(); + pending.remove(); + } } - // Not needed anymore, release - byteBufMsg.release(); - // Check pending queue and schedule next // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed writePendingIfAny(); } }); + } catch (final WritePendingException e) { - queueRequest(ctx, byteBufMsg, promise); + + if(wasPending == false){ + queueRequest(ctx, byteBufMsg, promise); + } } } - private synchronized void writePendingIfAny() { - if (pending.peek() == null) { - return; - } + private void writePendingIfAny() { + synchronized (asyncIn) { + if (pending.peek() == null) { + return; + } - // In case of pending, reschedule next message from queue - final PendingWriteRequest pendingWrite = pending.poll(); - final ByteBuf msg = pendingWrite.msg; - if (LOG.isTraceEnabled()) { - LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg)); - } + final PendingWriteRequest pendingWrite = pending.peek(); + final ByteBuf msg = pendingWrite.msg; + if (LOG.isTraceEnabled()) { + LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg)); + } - writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg); + writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true); + } } public static String byteBufToString(final ByteBuf msg) { - msg.resetReaderIndex(); final String s = msg.toString(Charsets.UTF_8); msg.resetReaderIndex(); return s; @@ -138,12 +167,13 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { } @Override - public synchronized void close() { + public void close() { asyncIn = null; } private Buffer toBuffer(final ByteBuf msg) { // TODO Buffer vs ByteBuf translate, Can we handle that better ? + msg.resetReaderIndex(); final byte[] temp = new byte[msg.readableBytes()]; msg.readBytes(temp, 0, msg.readableBytes()); return new Buffer(temp);