X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FAsyncSshHandlerWriter.java;h=10941021bceaa1e12433ff5291e7d55741ad822a;hp=eace0ac7eafe1d09381fbe597f9f6acfece953e6;hb=63bf2c78aab8a4a48201877a5b5de255ff3cdf75;hpb=05a8052a457b2e53f06233f1a0b056d162118566 diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java index eace0ac7ea..10941021bc 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java @@ -28,9 +28,9 @@ import org.slf4j.LoggerFactory; * Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server. * Also handles pending writes by caching requests until pending state is over. */ -final class AsyncSshHandlerWriter implements AutoCloseable { +public final class AsyncSshHandlerWriter implements AutoCloseable { - private static final Logger logger = LoggerFactory + private static final Logger LOG = LoggerFactory .getLogger(AsyncSshHandlerWriter.class); // public static final int MAX_PENDING_WRITES = 1000; @@ -62,62 +62,77 @@ final class AsyncSshHandlerWriter implements AutoCloseable { return; } - writeWithPendingDetection(ctx, promise, byteBufMsg); + writeWithPendingDetection(ctx, promise, byteBufMsg, false); } } - private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg) { + //sending message with pending + //if resending message not succesfull, then attribute wasPending is true + private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, final boolean wasPending) { try { - if (logger.isTraceEnabled()) { - logger.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg)); + + if (LOG.isTraceEnabled()) { + LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg)); } asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener() { - @Override - public void operationComplete(final IoWriteFuture future) { - if (logger.isTraceEnabled()) { - logger.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}", - ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg)); - } - - // Notify success or failure - if (future.isWritten()) { - promise.setSuccess(); - } else { - logger.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException()); - promise.setFailure(future.getException()); - } - - // Not needed anymore, release - byteBufMsg.release(); - - // Check pending queue and schedule next - // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed - writePendingIfAny(); + @Override + public void operationComplete(final IoWriteFuture future) { + if (LOG.isTraceEnabled()) { + LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}", + ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg)); + } + + // Notify success or failure + if (future.isWritten()) { + promise.setSuccess(); + } else { + LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException()); + promise.setFailure(future.getException()); + } + + // Not needed anymore, release + byteBufMsg.release(); + + synchronized (AsyncSshHandlerWriter.this) { + //rescheduling message from queue after successfully sent + if (wasPending) { + byteBufMsg.resetReaderIndex(); + pending.remove(); } - }); + } + + // Check pending queue and schedule next + // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed + writePendingIfAny(); + + } + }); + } catch (final WritePendingException e) { - queueRequest(ctx, byteBufMsg, promise); + + if(wasPending == false){ + queueRequest(ctx, byteBufMsg, promise); + } } } private synchronized void writePendingIfAny() { + if (pending.peek() == null) { return; } - // In case of pending, reschedule next message from queue - final PendingWriteRequest pendingWrite = pending.poll(); + final PendingWriteRequest pendingWrite = pending.peek(); final ByteBuf msg = pendingWrite.msg; - if (logger.isTraceEnabled()) { - logger.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg)); + if (LOG.isTraceEnabled()) { + LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg)); } - writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg); + writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true); } - private static String byteBufToString(final ByteBuf msg) { - msg.resetReaderIndex(); + public static String byteBufToString(final ByteBuf msg) { final String s = msg.toString(Charsets.UTF_8); msg.resetReaderIndex(); return s; @@ -125,13 +140,13 @@ final class AsyncSshHandlerWriter implements AutoCloseable { private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) { // try { - logger.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size()); - if (logger.isTraceEnabled()) { - logger.trace("Queueing request due to pending: {}", byteBufToString(msg)); + LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size()); + if (LOG.isTraceEnabled()) { + LOG.trace("Queueing request due to pending: {}", byteBufToString(msg)); } new PendingWriteRequest(ctx, msg, promise).pend(pending); // } catch (final Exception ex) { -// logger.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg)); +// LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg)); // msg.release(); // promise.setFailure(ex); // } @@ -144,6 +159,7 @@ final class AsyncSshHandlerWriter implements AutoCloseable { private Buffer toBuffer(final ByteBuf msg) { // TODO Buffer vs ByteBuf translate, Can we handle that better ? + msg.resetReaderIndex(); final byte[] temp = new byte[msg.readableBytes()]; msg.readBytes(temp, 0, msg.readableBytes()); return new Buffer(temp);