package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
+import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandler;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoInputStream;
import org.apache.sshd.common.io.IoReadFuture;
* Listener on async input stream from SSH session.
* This listeners schedules reads in a loop until the session is closed or read fails.
*/
-final class AsyncSshHanderReader implements SshFutureListener<IoReadFuture>, AutoCloseable {
+public final class AsyncSshHandlerReader implements SshFutureListener<IoReadFuture>, AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class);
private static final int BUFFER_SIZE = 8192;
- private final ChannelOutboundHandler asyncSshHandler;
- private final ChannelHandlerContext ctx;
+ private final AutoCloseable connectionClosedCallback;
+ private final ReadMsgHandler readHandler;
+ private final String channelId;
private IoInputStream asyncOut;
private Buffer buf;
private IoReadFuture currentReadFuture;
- public AsyncSshHanderReader(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
- this.asyncSshHandler = asyncSshHandler;
- this.ctx = ctx;
+ public AsyncSshHandlerReader(final AutoCloseable connectionClosedCallback, final ReadMsgHandler readHandler, final String channelId, final IoInputStream asyncOut) {
+ this.connectionClosedCallback = connectionClosedCallback;
+ this.readHandler = readHandler;
+ this.channelId = channelId;
this.asyncOut = asyncOut;
buf = new Buffer(BUFFER_SIZE);
asyncOut.read(buf).addListener(this);
if(future.getException() != null) {
if(asyncOut.isClosed() || asyncOut.isClosing()) {
// Ssh dropped
- logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
+ logger.debug("Ssh session dropped on channel: {}", channelId, future.getException());
} else {
- logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
+ logger.warn("Exception while reading from SSH remote on channel {}", channelId, future.getException());
}
invokeDisconnect();
return;
}
if (future.getRead() > 0) {
- ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead()));
+ final ByteBuf msg = Unpooled.wrappedBuffer(buf.array(), 0, future.getRead());
+ if(logger.isTraceEnabled()) {
+ logger.trace("Reading message on channel: {}, message: {}", channelId, AsyncSshHandlerWriter.byteBufToString(msg));
+ }
+ readHandler.onMessageRead(msg);
// Schedule next read
buf = new Buffer(BUFFER_SIZE);
private void invokeDisconnect() {
try {
- asyncSshHandler.disconnect(ctx, ctx.newPromise());
+ connectionClosedCallback.close();
} catch (final Exception e) {
// This should not happen
throw new IllegalStateException(e);
// Remove self as listener on close to prevent reading from closed input
if(currentReadFuture != null) {
currentReadFuture.removeListener(this);
+ currentReadFuture = null;
}
asyncOut = null;
}
+
+ public interface ReadMsgHandler {
+
+ void onMessageRead(ByteBuf msg);
+ }
}