private OutputStream stdIn;
- private Queue<ByteBuf> postponed = new LinkedList<>();
+ private final Queue<ByteBuf> postponed = new LinkedList<>();
private ChannelHandlerContext ctx;
private ChannelPromise disconnectPromise;
private final Object lock = new Object();
- public SshClientAdapter(SshClient sshClient, Invoker invoker) {
+ public SshClientAdapter(final SshClient sshClient, final Invoker invoker) {
this.sshClient = sshClient;
this.invoker = invoker;
}
- // TODO: refactor
+ // TODO ganymed spawns a Thread that receives the data from remote inside TransportManager
+ // Get rid of this thread and reuse Ganymed internal thread (not sure if its possible without modifications in ganymed)
public void run() {
try {
- SshSession session = sshClient.openSession();
+ final SshSession session = sshClient.openSession();
invoker.invoke(session);
- InputStream stdOut = session.getStdout();
- session.getStderr();
+ final InputStream stdOut = session.getStdout();
synchronized (lock) {
-
stdIn = session.getStdin();
- ByteBuf message;
- while ((message = postponed.poll()) != null) {
- writeImpl(message);
+ while (postponed.peek() != null) {
+ writeImpl(postponed.poll());
}
}
while (!stopRequested.get()) {
- byte[] readBuff = new byte[BUFFER_SIZE];
- int c = stdOut.read(readBuff);
+ final byte[] readBuff = new byte[BUFFER_SIZE];
+ final int c = stdOut.read(readBuff);
if (c == -1) {
continue;
}
- byte[] tranBuff = new byte[c];
- System.arraycopy(readBuff, 0, tranBuff, 0, c);
- ByteBuf byteBuf = Unpooled.buffer(c);
- byteBuf.writeBytes(tranBuff);
- ctx.fireChannelRead(byteBuf);
+ ctx.fireChannelRead(Unpooled.copiedBuffer(readBuff, 0, c));
}
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.error("Unexpected exception", e);
} finally {
sshClient.close();
}
// TODO: needs rework to match netconf framer API.
- public void write(ByteBuf message) throws IOException {
+ public void write(final ByteBuf message) throws IOException {
synchronized (lock) {
if (stdIn == null) {
postponed.add(message);
}
}
- private void writeImpl(ByteBuf message) throws IOException {
+ private void writeImpl(final ByteBuf message) throws IOException {
message.getBytes(0, stdIn, message.readableBytes());
message.release();
stdIn.flush();
}
- public void stop(ChannelPromise promise) {
+ public void stop(final ChannelPromise promise) {
synchronized (lock) {
stopRequested.set(true);
disconnectPromise = promise;
}
}
- public Thread start(ChannelHandlerContext ctx, ChannelFuture channelFuture) {
+ public Thread start(final ChannelHandlerContext ctx, final ChannelFuture channelFuture) {
checkArgument(channelFuture.isSuccess());
checkNotNull(ctx.channel().remoteAddress());
synchronized (this) {
checkState(this.ctx == null);
this.ctx = ctx;
}
- String threadName = toString();
- Thread thread = new Thread(this, threadName);
+ final String threadName = toString();
+ final Thread thread = new Thread(this, threadName);
thread.start();
return thread;
}
package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
import ch.ethz.ssh2.Session;
-import ch.ethz.ssh2.StreamGobbler;
-
import ch.ethz.ssh2.channel.Channel;
import java.io.Closeable;
import java.io.IOException;
class SshSession implements Closeable {
private final Session session;
- public SshSession(Session session) {
+ public SshSession(final Session session) {
this.session = session;
}
-
- public void startSubSystem(String name) throws IOException {
+ public void startSubSystem(final String name) throws IOException {
session.startSubSystem(name);
}
public InputStream getStdout() {
- return new StreamGobbler(session.getStdout());
+ return session.getStdout();
}
+ // FIXME according to http://www.ganymed.ethz.ch/ssh2/FAQ.html#blocking you should read data from both stdout and stderr to prevent window filling up (since stdout and stderr share a window)
+ // FIXME stdErr is not used anywhere
public InputStream getStderr() {
return session.getStderr();
}