X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fnetconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FAsyncSshHandlerReader.java;fp=opendaylight%2Fnetconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FAsyncSshHanderReader.java;h=ada15583cdef9cc86eb6a6d3b45bcf8f9be8defc;hb=70c54e6de3804373c36d70145483b3937d2aca2f;hp=73a24f27b2d7b839e8bd04c3bdced2dabb796a39;hpb=d5fc2eb62f064ea21041ed25f2a64a9803391ca1;p=controller.git diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerReader.java similarity index 66% rename from opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java rename to opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerReader.java index 73a24f27b2..ada15583cd 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHanderReader.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerReader.java @@ -8,9 +8,8 @@ package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandler; import org.apache.sshd.common.future.SshFutureListener; import org.apache.sshd.common.io.IoInputStream; import org.apache.sshd.common.io.IoReadFuture; @@ -22,22 +21,24 @@ import org.slf4j.LoggerFactory; * Listener on async input stream from SSH session. * This listeners schedules reads in a loop until the session is closed or read fails. */ -final class AsyncSshHanderReader implements SshFutureListener, AutoCloseable { +public final class AsyncSshHandlerReader implements SshFutureListener, AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class); private static final int BUFFER_SIZE = 8192; - private final ChannelOutboundHandler asyncSshHandler; - private final ChannelHandlerContext ctx; + private final AutoCloseable connectionClosedCallback; + private final ReadMsgHandler readHandler; + private final String channelId; private IoInputStream asyncOut; private Buffer buf; private IoReadFuture currentReadFuture; - public AsyncSshHanderReader(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) { - this.asyncSshHandler = asyncSshHandler; - this.ctx = ctx; + public AsyncSshHandlerReader(final AutoCloseable connectionClosedCallback, final ReadMsgHandler readHandler, final String channelId, final IoInputStream asyncOut) { + this.connectionClosedCallback = connectionClosedCallback; + this.readHandler = readHandler; + this.channelId = channelId; this.asyncOut = asyncOut; buf = new Buffer(BUFFER_SIZE); asyncOut.read(buf).addListener(this); @@ -48,16 +49,20 @@ final class AsyncSshHanderReader implements SshFutureListener, Aut if(future.getException() != null) { if(asyncOut.isClosed() || asyncOut.isClosing()) { // Ssh dropped - logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException()); + logger.debug("Ssh session dropped on channel: {}", channelId, future.getException()); } else { - logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException()); + logger.warn("Exception while reading from SSH remote on channel {}", channelId, future.getException()); } invokeDisconnect(); return; } if (future.getRead() > 0) { - ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead())); + final ByteBuf msg = Unpooled.wrappedBuffer(buf.array(), 0, future.getRead()); + if(logger.isTraceEnabled()) { + logger.trace("Reading message on channel: {}, message: {}", channelId, AsyncSshHandlerWriter.byteBufToString(msg)); + } + readHandler.onMessageRead(msg); // Schedule next read buf = new Buffer(BUFFER_SIZE); @@ -68,7 +73,7 @@ final class AsyncSshHanderReader implements SshFutureListener, Aut private void invokeDisconnect() { try { - asyncSshHandler.disconnect(ctx, ctx.newPromise()); + connectionClosedCallback.close(); } catch (final Exception e) { // This should not happen throw new IllegalStateException(e); @@ -80,8 +85,14 @@ final class AsyncSshHanderReader implements SshFutureListener, Aut // Remove self as listener on close to prevent reading from closed input if(currentReadFuture != null) { currentReadFuture.removeListener(this); + currentReadFuture = null; } asyncOut = null; } + + public interface ReadMsgHandler { + + void onMessageRead(ByteBuf msg); + } }