import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opendaylight.controller.netconf.util.handler.ssh.virtualsocket.VirtualSocketException;
+
/**
- * Worker thread class. Handles all downstream and upstream events in SSH Netty pipeline.
+ * Worker thread class. Handles all downstream and upstream events in SSH Netty
+ * pipeline.
*/
public class SshClientAdapter implements Runnable {
private final SshClient sshClient;
private InputStream stdErr;
private OutputStream stdIn;
+ private Queue<ByteBuf> postponned = new LinkedList<>();
+
+
private ChannelHandlerContext ctx;
private ChannelPromise disconnectPromise;
private final Object lock = new Object();
- public SshClientAdapter(SshClient sshClient,
- Invoker invoker) {
+ public SshClientAdapter(SshClient sshClient, Invoker invoker) {
this.sshClient = sshClient;
this.invoker = invoker;
}
try {
session = sshClient.openSession();
invoker.invoke(session);
-
stdOut = session.getStdout();
stdErr = session.getStderr();
- synchronized(lock) {
+ synchronized (lock) {
+
stdIn = session.getStdin();
+ ByteBuf message = null;
+ while ((message = postponned.poll()) != null) {
+ writeImpl(message);
+ }
}
while (stopRequested.get() == false) {
byte[] readBuff = new byte[1024];
int c = stdOut.read(readBuff);
-
+ if (c == -1) {
+ continue;
+ }
byte[] tranBuff = new byte[c];
System.arraycopy(readBuff, 0, tranBuff, 0, c);
sshClient.close();
synchronized (lock) {
- if(disconnectPromise != null) ctx.disconnect(disconnectPromise);
+ if (disconnectPromise != null)
+ ctx.disconnect(disconnectPromise);
}
}
}
// TODO: needs rework to match netconf framer API.
- public void write(String message) throws IOException {
+ public void write(ByteBuf message) throws IOException {
synchronized (lock) {
- if (stdIn == null) throw new IllegalStateException("StdIn not available");
+ if (stdIn == null) {
+ postponned.add(message);
+ return;
+ }
+ writeImpl(message);
}
- stdIn.write(message.getBytes());
+ }
+
+ private void writeImpl(ByteBuf message) throws IOException {
+ message.getBytes(0, stdIn, message.readableBytes());
stdIn.flush();
}
}
public void start(ChannelHandlerContext ctx) {
- if(this.ctx != null) return; // context is already associated.
-
+ if (this.ctx != null)
+ return; // context is already associated.
this.ctx = ctx;
new Thread(this).start();
}