From 8bdd819fcac266457c6d344aef869e51a4d4c0c1 Mon Sep 17 00:00:00 2001 From: Maros Marsalek Date: Wed, 20 Aug 2014 15:11:35 +0200 Subject: [PATCH] BUG-1568 Getting rid of StreamGobller from netconf ssh client. We have another thread in SshClientAdapter that works as StreamGobller http://www.ganymed.ethz.ch/ssh2/FAQ.html#blocking Change-Id: I3f61b14cb22396d8b5898028bd83f62d98c6866d Signed-off-by: Maros Marsalek --- .../handler/ssh/client/SshClientAdapter.java | 42 ++++++++----------- .../handler/ssh/client/SshSession.java | 11 +++-- 2 files changed, 23 insertions(+), 30 deletions(-) diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshClientAdapter.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshClientAdapter.java index 1a2eb3f1ab..4ca7bdf958 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshClientAdapter.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshClientAdapter.java @@ -41,7 +41,7 @@ class SshClientAdapter implements Runnable { private OutputStream stdIn; - private Queue postponed = new LinkedList<>(); + private final Queue postponed = new LinkedList<>(); private ChannelHandlerContext ctx; private ChannelPromise disconnectPromise; @@ -50,42 +50,36 @@ class SshClientAdapter implements Runnable { private final Object lock = new Object(); - public SshClientAdapter(SshClient sshClient, Invoker invoker) { + public SshClientAdapter(final SshClient sshClient, final Invoker invoker) { this.sshClient = sshClient; this.invoker = invoker; } - // TODO: refactor + // TODO ganymed spawns a Thread that receives the data from remote inside TransportManager + // Get rid of this thread and reuse Ganymed internal thread (not sure if its possible without modifications in ganymed) public void run() { try { - SshSession session = sshClient.openSession(); + final SshSession session = sshClient.openSession(); invoker.invoke(session); - InputStream stdOut = session.getStdout(); - session.getStderr(); + final InputStream stdOut = session.getStdout(); synchronized (lock) { - stdIn = session.getStdin(); - ByteBuf message; - while ((message = postponed.poll()) != null) { - writeImpl(message); + while (postponed.peek() != null) { + writeImpl(postponed.poll()); } } while (!stopRequested.get()) { - byte[] readBuff = new byte[BUFFER_SIZE]; - int c = stdOut.read(readBuff); + final byte[] readBuff = new byte[BUFFER_SIZE]; + final int c = stdOut.read(readBuff); if (c == -1) { continue; } - byte[] tranBuff = new byte[c]; - System.arraycopy(readBuff, 0, tranBuff, 0, c); - ByteBuf byteBuf = Unpooled.buffer(c); - byteBuf.writeBytes(tranBuff); - ctx.fireChannelRead(byteBuf); + ctx.fireChannelRead(Unpooled.copiedBuffer(readBuff, 0, c)); } - } catch (Exception e) { + } catch (final Exception e) { logger.error("Unexpected exception", e); } finally { sshClient.close(); @@ -99,7 +93,7 @@ class SshClientAdapter implements Runnable { } // TODO: needs rework to match netconf framer API. - public void write(ByteBuf message) throws IOException { + public void write(final ByteBuf message) throws IOException { synchronized (lock) { if (stdIn == null) { postponed.add(message); @@ -109,28 +103,28 @@ class SshClientAdapter implements Runnable { } } - private void writeImpl(ByteBuf message) throws IOException { + private void writeImpl(final ByteBuf message) throws IOException { message.getBytes(0, stdIn, message.readableBytes()); message.release(); stdIn.flush(); } - public void stop(ChannelPromise promise) { + public void stop(final ChannelPromise promise) { synchronized (lock) { stopRequested.set(true); disconnectPromise = promise; } } - public Thread start(ChannelHandlerContext ctx, ChannelFuture channelFuture) { + public Thread start(final ChannelHandlerContext ctx, final ChannelFuture channelFuture) { checkArgument(channelFuture.isSuccess()); checkNotNull(ctx.channel().remoteAddress()); synchronized (this) { checkState(this.ctx == null); this.ctx = ctx; } - String threadName = toString(); - Thread thread = new Thread(this, threadName); + final String threadName = toString(); + final Thread thread = new Thread(this, threadName); thread.start(); return thread; } diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshSession.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshSession.java index 44893b8794..9cdc5926f0 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshSession.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshSession.java @@ -9,8 +9,6 @@ package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client; import ch.ethz.ssh2.Session; -import ch.ethz.ssh2.StreamGobbler; - import ch.ethz.ssh2.channel.Channel; import java.io.Closeable; import java.io.IOException; @@ -23,19 +21,20 @@ import java.io.OutputStream; class SshSession implements Closeable { private final Session session; - public SshSession(Session session) { + public SshSession(final Session session) { this.session = session; } - - public void startSubSystem(String name) throws IOException { + public void startSubSystem(final String name) throws IOException { session.startSubSystem(name); } public InputStream getStdout() { - return new StreamGobbler(session.getStdout()); + return session.getStdout(); } + // FIXME according to http://www.ganymed.ethz.ch/ssh2/FAQ.html#blocking you should read data from both stdout and stderr to prevent window filling up (since stdout and stderr share a window) + // FIXME stdErr is not used anywhere public InputStream getStderr() { return session.getStderr(); } -- 2.36.6