* pipeline.
*/
public class SshClientAdapter implements Runnable {
+ private static final int BUFFER_SIZE = 1024;
+
private final SshClient sshClient;
private final Invoker invoker;
- private SshSession session;
- private InputStream stdOut;
- private InputStream stdErr;
private OutputStream stdIn;
- private Queue<ByteBuf> postponned = new LinkedList<>();
-
+ private Queue<ByteBuf> postponed = new LinkedList<>();
private ChannelHandlerContext ctx;
private ChannelPromise disconnectPromise;
public void run() {
try {
- session = sshClient.openSession();
+ SshSession session = sshClient.openSession();
invoker.invoke(session);
- stdOut = session.getStdout();
- stdErr = session.getStderr();
+ InputStream stdOut = session.getStdout();
+ session.getStderr();
synchronized (lock) {
stdIn = session.getStdin();
- ByteBuf message = null;
- while ((message = postponned.poll()) != null) {
+ ByteBuf message;
+ while ((message = postponed.poll()) != null) {
writeImpl(message);
}
}
while (stopRequested.get() == false) {
- byte[] readBuff = new byte[1024];
+ byte[] readBuff = new byte[BUFFER_SIZE];
int c = stdOut.read(readBuff);
if (c == -1) {
continue;
sshClient.close();
synchronized (lock) {
- if (disconnectPromise != null)
+ if (disconnectPromise != null) {
ctx.disconnect(disconnectPromise);
+ }
}
}
}
public void write(ByteBuf message) throws IOException {
synchronized (lock) {
if (stdIn == null) {
- postponned.add(message);
+ postponed.add(message);
return;
}
writeImpl(message);
}
public void start(ChannelHandlerContext ctx) {
- if (this.ctx != null)
- return; // context is already associated.
+ if (this.ctx != null) {
+ // context is already associated.
+ return;
+ }
this.ctx = ctx;
new Thread(this).start();
}