private OutputStream stdIn;
- private Queue<ByteBuf> postponed = new LinkedList<>();
+ private final Queue<ByteBuf> postponed = new LinkedList<>();
private ChannelHandlerContext ctx;
private ChannelPromise disconnectPromise;
private final Object lock = new Object();
- public SshClientAdapter(SshClient sshClient, Invoker invoker) {
+ public SshClientAdapter(final SshClient sshClient, final Invoker invoker) {
this.sshClient = sshClient;
this.invoker = invoker;
}
- // TODO: refactor
+ // TODO ganymed spawns a Thread that receives the data from remote inside TransportManager
+ // Get rid of this thread and reuse Ganymed internal thread (not sure if its possible without modifications in ganymed)
public void run() {
try {
- SshSession session = sshClient.openSession();
+ final SshSession session = sshClient.openSession();
invoker.invoke(session);
- InputStream stdOut = session.getStdout();
- session.getStderr();
+ final InputStream stdOut = session.getStdout();
synchronized (lock) {
-
stdIn = session.getStdin();
- ByteBuf message;
- while ((message = postponed.poll()) != null) {
- writeImpl(message);
+ while (postponed.peek() != null) {
+ writeImpl(postponed.poll());
}
}
while (!stopRequested.get()) {
- byte[] readBuff = new byte[BUFFER_SIZE];
- int c = stdOut.read(readBuff);
+ final byte[] readBuff = new byte[BUFFER_SIZE];
+ final int c = stdOut.read(readBuff);
if (c == -1) {
continue;
}
- byte[] tranBuff = new byte[c];
- System.arraycopy(readBuff, 0, tranBuff, 0, c);
- ByteBuf byteBuf = Unpooled.buffer(c);
- byteBuf.writeBytes(tranBuff);
- ctx.fireChannelRead(byteBuf);
+ ctx.fireChannelRead(Unpooled.copiedBuffer(readBuff, 0, c));
}
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.error("Unexpected exception", e);
} finally {
sshClient.close();
}
// TODO: needs rework to match netconf framer API.
- public void write(ByteBuf message) throws IOException {
+ public void write(final ByteBuf message) throws IOException {
synchronized (lock) {
if (stdIn == null) {
postponed.add(message);
}
}
- private void writeImpl(ByteBuf message) throws IOException {
+ private void writeImpl(final ByteBuf message) throws IOException {
message.getBytes(0, stdIn, message.readableBytes());
message.release();
stdIn.flush();
}
- public void stop(ChannelPromise promise) {
+ public void stop(final ChannelPromise promise) {
synchronized (lock) {
stopRequested.set(true);
disconnectPromise = promise;
}
}
- public Thread start(ChannelHandlerContext ctx, ChannelFuture channelFuture) {
+ public Thread start(final ChannelHandlerContext ctx, final ChannelFuture channelFuture) {
checkArgument(channelFuture.isSuccess());
checkNotNull(ctx.channel().remoteAddress());
synchronized (this) {
checkState(this.ctx == null);
this.ctx = ctx;
}
- String threadName = toString();
- Thread thread = new Thread(this, threadName);
+ final String threadName = toString();
+ final Thread thread = new Thread(this, threadName);
thread.start();
return thread;
}