X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FAsyncSshHandlerWriter.java;h=10941021bceaa1e12433ff5291e7d55741ad822a;hp=1e976ce6a1394c31e0bbd8dd8746463193a3ac5c;hb=63bf2c78aab8a4a48201877a5b5de255ff3cdf75;hpb=a2563a94253f9c2603e0ab25b8f412ea07fcf51d diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java index 1e976ce6a1..10941021bc 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java @@ -62,12 +62,15 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { return; } - writeWithPendingDetection(ctx, promise, byteBufMsg); + writeWithPendingDetection(ctx, promise, byteBufMsg, false); } } - private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg) { + //sending message with pending + //if resending message not succesfull, then attribute wasPending is true + private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, final boolean wasPending) { try { + if (LOG.isTraceEnabled()) { LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg)); } @@ -77,7 +80,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { public void operationComplete(final IoWriteFuture future) { if (LOG.isTraceEnabled()) { LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}", - ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg)); + ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg)); } // Notify success or failure @@ -91,33 +94,45 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { // Not needed anymore, release byteBufMsg.release(); + synchronized (AsyncSshHandlerWriter.this) { + //rescheduling message from queue after successfully sent + if (wasPending) { + byteBufMsg.resetReaderIndex(); + pending.remove(); + } + } + // Check pending queue and schedule next // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed writePendingIfAny(); + } }); + } catch (final WritePendingException e) { - queueRequest(ctx, byteBufMsg, promise); + + if(wasPending == false){ + queueRequest(ctx, byteBufMsg, promise); + } } } private synchronized void writePendingIfAny() { + if (pending.peek() == null) { return; } - // In case of pending, reschedule next message from queue - final PendingWriteRequest pendingWrite = pending.poll(); + final PendingWriteRequest pendingWrite = pending.peek(); final ByteBuf msg = pendingWrite.msg; if (LOG.isTraceEnabled()) { LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg)); } - writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg); + writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true); } public static String byteBufToString(final ByteBuf msg) { - msg.resetReaderIndex(); final String s = msg.toString(Charsets.UTF_8); msg.resetReaderIndex(); return s; @@ -144,6 +159,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { private Buffer toBuffer(final ByteBuf msg) { // TODO Buffer vs ByteBuf translate, Can we handle that better ? + msg.resetReaderIndex(); final byte[] temp = new byte[msg.readableBytes()]; msg.readBytes(temp, 0, msg.readableBytes()); return new Buffer(temp);