X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FAsyncSshHandlerWriter.java;h=e05408551f065bdb151e0306e6c51f3100734377;hb=c59a0370adf5660009a8352e31c017ad4274039a;hp=7753c9fd43a39b8f06247a4356b6f9d73eaef4f7;hpb=40434cef79c60b346c61720a23e00bb237425d72;p=netconf.git diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java index 7753c9fd43..e05408551f 100644 --- a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java +++ b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java @@ -12,16 +12,16 @@ import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Deque; import java.util.LinkedList; import java.util.Queue; -import org.apache.sshd.common.future.SshFutureListener; import org.apache.sshd.common.io.IoOutputStream; -import org.apache.sshd.common.io.IoWriteFuture; import org.apache.sshd.common.io.WritePendingException; import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.buffer.ByteArrayBuffer; +import org.checkerframework.checker.lock.qual.GuardedBy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,9 +38,10 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { // TODO implement Limiting mechanism for pending writes // But there is a possible issue with limiting: // 1. What to do when queue is full ? Immediate Fail for every request ? - // 2. At this level we might be dealing with Chunks of messages(not whole messages) and unexpected behavior might occur - // when we send/queue 1 chunk and fail the other chunks + // 2. At this level we might be dealing with Chunks of messages(not whole messages) + // and unexpected behavior might occur when we send/queue 1 chunk and fail the other chunks + private final Object asyncInLock = new Object(); private volatile IoOutputStream asyncIn; // Order has to be preserved for queued writes @@ -50,6 +51,9 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { this.asyncIn = asyncIn; } + @GuardedBy("asyncInLock") + private boolean isWriteExecuted = false; + public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { if (asyncIn == null) { @@ -60,14 +64,14 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { // writes and pending writes would lock the underlyinch channel session // window resize write would try to write the message on an already locked channelSession // while the pending write was in progress from the write callback - synchronized (asyncIn) { + synchronized (asyncInLock) { // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here // If we are closed/closing, set immediate fail if (asyncIn.isClosed() || asyncIn.isClosing()) { promise.setFailure(new IllegalStateException("Channel closed")); } else { final ByteBuf byteBufMsg = (ByteBuf) msg; - if (pending.isEmpty() == false) { + if (isWriteExecuted) { queueRequest(ctx, byteBufMsg, promise); return; } @@ -79,68 +83,72 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { //sending message with pending //if resending message not succesfull, then attribute wasPending is true - private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, final boolean wasPending) { + private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, + final ByteBuf byteBufMsg, final boolean wasPending) { try { if (LOG.isTraceEnabled()) { LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg)); } - asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener() { - - @Override - public void operationComplete(final IoWriteFuture future) { - // synchronized block due to deadlock that happens on ssh window resize - // writes and pending writes would lock the underlyinch channel session - // window resize write would try to write the message on an already locked channelSession, - // while the pending write was in progress from the write callback - synchronized (asyncIn) { - if (LOG.isTraceEnabled()) { - LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}", - ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg)); - } - - // Notify success or failure - if (future.isWritten()) { - promise.setSuccess(); - } else { - LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException()); - promise.setFailure(future.getException()); - } - - // Not needed anymore, release - byteBufMsg.release(); - - //rescheduling message from queue after successfully sent - if (wasPending) { - byteBufMsg.resetReaderIndex(); - pending.remove(); - } + + isWriteExecuted = true; + + asyncIn.writePacket(toBuffer(byteBufMsg)).addListener(future -> { + // synchronized block due to deadlock that happens on ssh window resize + // writes and pending writes would lock the underlyinch channel session + // window resize write would try to write the message on an already locked channelSession, + // while the pending write was in progress from the write callback + synchronized (asyncInLock) { + if (LOG.isTraceEnabled()) { + LOG.trace( + "Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}", + ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg)); } - // Check pending queue and schedule next - // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed - writePendingIfAny(); + // Notify success or failure + if (future.isWritten()) { + promise.setSuccess(); + } else { + LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), + byteBufToString(byteBufMsg), future.getException()); + promise.setFailure(future.getException()); + } + + //rescheduling message from queue after successfully sent + if (wasPending) { + byteBufMsg.resetReaderIndex(); + pending.remove(); + } + + // Not needed anymore, release + byteBufMsg.release(); } - }); - } catch (final WritePendingException e) { + // Check pending queue and schedule next + // At this time we are guaranteed that we are not in pending state anymore + // so the next request should succeed + writePendingIfAny(); + }); - if(wasPending == false){ + } catch (final IOException | WritePendingException e) { + if (!wasPending) { queueRequest(ctx, byteBufMsg, promise); } } } private void writePendingIfAny() { - synchronized (asyncIn) { + synchronized (asyncInLock) { if (pending.peek() == null) { + isWriteExecuted = false; return; } final PendingWriteRequest pendingWrite = pending.peek(); final ByteBuf msg = pendingWrite.msg; if (LOG.isTraceEnabled()) { - LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg)); + LOG.trace("Writing pending request on channel: {}, message: {}", + pendingWrite.ctx.channel(), byteBufToString(msg)); } writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true); @@ -161,7 +169,8 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { } new PendingWriteRequest(ctx, msg, promise).pend(pending); // } catch (final Exception ex) { -// LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg)); +// LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", +// ctx.channel(), ex, byteBufToString(msg)); // msg.release(); // promise.setFailure(ex); // } @@ -173,10 +182,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { } private static Buffer toBuffer(final ByteBuf msg) { - // FIXME: Translation from ByteBuf to Buffer. Buffer is an abstract class, so based on the assumptions - // we can make about the contents of ByteBuf, we should be able to skip copying byte arrays around - // by creating an appropriate subclass. - + // TODO Buffer vs ByteBuf translate, Can we handle that better ? msg.resetReaderIndex(); final byte[] temp = new byte[msg.readableBytes()]; msg.readBytes(temp, 0, msg.readableBytes()); @@ -188,7 +194,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { private final ByteBuf msg; private final ChannelPromise promise; - public PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) { + PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) { this.ctx = ctx; // Reset reader index, last write (failed) attempt moved index to the end msg.resetReaderIndex(); @@ -200,8 +206,8 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES, // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small", // pending.size(), ctx.channel()); - Preconditions.checkState(pending.offer(this), "Cannot pend another request write (pending count: %s) on channel: %s", - pending.size(), ctx.channel()); + Preconditions.checkState(pending.offer(this), + "Cannot pend another request write (pending count: %s) on channel: %s", pending.size(), ctx.channel()); } } }