X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Futil%2Fhandler%2Fssh%2Fclient%2FSshClientAdapter.java;h=244bcc0041c963988aa8fc78c712767fc7c643c2;hp=a50462e40dae1e3c5ce40afd451713992ad28e2a;hb=e159106bc148e76fc1e3e3c780bdd740d99e74ed;hpb=17c23f3c4f9726f268b481398ee0ca1df499b324 diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/ssh/client/SshClientAdapter.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/ssh/client/SshClientAdapter.java index a50462e40d..244bcc0041 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/ssh/client/SshClientAdapter.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/ssh/client/SshClientAdapter.java @@ -12,24 +12,30 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.LinkedList; +import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import org.opendaylight.controller.netconf.util.handler.ssh.virtualsocket.VirtualSocketException; + /** - * Worker thread class. Handles all downstream and upstream events in SSH Netty pipeline. + * Worker thread class. Handles all downstream and upstream events in SSH Netty + * pipeline. */ public class SshClientAdapter implements Runnable { + private static final int BUFFER_SIZE = 1024; + private final SshClient sshClient; private final Invoker invoker; - private SshSession session; - private InputStream stdOut; - private InputStream stdErr; private OutputStream stdIn; + private Queue postponed = new LinkedList<>(); + private ChannelHandlerContext ctx; private ChannelPromise disconnectPromise; @@ -37,28 +43,33 @@ public class SshClientAdapter implements Runnable { private final Object lock = new Object(); - public SshClientAdapter(SshClient sshClient, - Invoker invoker) { + public SshClientAdapter(SshClient sshClient, Invoker invoker) { this.sshClient = sshClient; this.invoker = invoker; } public void run() { try { - session = sshClient.openSession(); + SshSession session = sshClient.openSession(); invoker.invoke(session); + InputStream stdOut = session.getStdout(); + session.getStderr(); - stdOut = session.getStdout(); - stdErr = session.getStderr(); + synchronized (lock) { - synchronized(lock) { stdIn = session.getStdin(); + ByteBuf message; + while ((message = postponed.poll()) != null) { + writeImpl(message); + } } - while (stopRequested.get() == false) { - byte[] readBuff = new byte[1024]; + while (!stopRequested.get()) { + byte[] readBuff = new byte[BUFFER_SIZE]; int c = stdOut.read(readBuff); - + if (c == -1) { + continue; + } byte[] tranBuff = new byte[c]; System.arraycopy(readBuff, 0, tranBuff, 0, c); @@ -71,22 +82,31 @@ public class SshClientAdapter implements Runnable { // Netty closed connection prematurely. // Just pass and move on. } catch (Exception e) { - throw new RuntimeException(e); + throw new IllegalStateException(e); } finally { sshClient.close(); synchronized (lock) { - if(disconnectPromise != null) ctx.disconnect(disconnectPromise); + if (disconnectPromise != null) { + ctx.disconnect(disconnectPromise); + } } } } // TODO: needs rework to match netconf framer API. - public void write(String message) throws IOException { + public void write(ByteBuf message) throws IOException { synchronized (lock) { - if (stdIn == null) throw new IllegalStateException("StdIn not available"); + if (stdIn == null) { + postponed.add(message); + return; + } + writeImpl(message); } - stdIn.write(message.getBytes()); + } + + private void writeImpl(ByteBuf message) throws IOException { + message.getBytes(0, stdIn, message.readableBytes()); stdIn.flush(); } @@ -98,8 +118,10 @@ public class SshClientAdapter implements Runnable { } public void start(ChannelHandlerContext ctx) { - if(this.ctx != null) return; // context is already associated. - + if (this.ctx != null) { + // context is already associated. + return; + } this.ctx = ctx; new Thread(this).start(); }