X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FSshClientAdapter.java;h=4ca7bdf9580e40ec328d1057a1230a5c7d6eebec;hp=ad8b25ff2156d8e937d65d054b41b1e3f34c159e;hb=38126131b91a300c7545df31b9b1ba846292696e;hpb=5e7328e70f420ee4460a9f64f10368175c851370 diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshClientAdapter.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshClientAdapter.java index ad8b25ff21..4ca7bdf958 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshClientAdapter.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/SshClientAdapter.java @@ -8,8 +8,13 @@ package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import java.io.IOException; @@ -18,7 +23,6 @@ import java.io.OutputStream; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; -import org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket.VirtualSocketException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +31,7 @@ import org.slf4j.LoggerFactory; * Worker thread class. Handles all downstream and upstream events in SSH Netty * pipeline. */ -public class SshClientAdapter implements Runnable { +class SshClientAdapter implements Runnable { private static final Logger logger = LoggerFactory.getLogger(SshClientAdapter.class); private static final int BUFFER_SIZE = 1024; @@ -37,7 +41,7 @@ public class SshClientAdapter implements Runnable { private OutputStream stdIn; - private Queue postponed = new LinkedList<>(); + private final Queue postponed = new LinkedList<>(); private ChannelHandlerContext ctx; private ChannelPromise disconnectPromise; @@ -46,47 +50,36 @@ public class SshClientAdapter implements Runnable { private final Object lock = new Object(); - public SshClientAdapter(SshClient sshClient, Invoker invoker) { + public SshClientAdapter(final SshClient sshClient, final Invoker invoker) { this.sshClient = sshClient; this.invoker = invoker; } + // TODO ganymed spawns a Thread that receives the data from remote inside TransportManager + // Get rid of this thread and reuse Ganymed internal thread (not sure if its possible without modifications in ganymed) public void run() { try { - SshSession session = sshClient.openSession(); + final SshSession session = sshClient.openSession(); invoker.invoke(session); - InputStream stdOut = session.getStdout(); - session.getStderr(); + final InputStream stdOut = session.getStdout(); synchronized (lock) { - stdIn = session.getStdin(); - ByteBuf message; - while ((message = postponed.poll()) != null) { - writeImpl(message); + while (postponed.peek() != null) { + writeImpl(postponed.poll()); } } while (!stopRequested.get()) { - byte[] readBuff = new byte[BUFFER_SIZE]; - int c = stdOut.read(readBuff); + final byte[] readBuff = new byte[BUFFER_SIZE]; + final int c = stdOut.read(readBuff); if (c == -1) { continue; } - byte[] tranBuff = new byte[c]; - System.arraycopy(readBuff, 0, tranBuff, 0, c); - ByteBuf byteBuf = Unpooled.buffer(c); - byteBuf.writeBytes(tranBuff); - ctx.fireChannelRead(byteBuf); + ctx.fireChannelRead(Unpooled.copiedBuffer(readBuff, 0, c)); } - - } catch (VirtualSocketException e) { - // Netty closed connection prematurely. - // Or maybe tried to open ganymed connection without having initialized session - // (ctx.channel().remoteAddress() is null) - // Just pass and move on. - } catch (Exception e) { + } catch (final Exception e) { logger.error("Unexpected exception", e); } finally { sshClient.close(); @@ -100,7 +93,7 @@ public class SshClientAdapter implements Runnable { } // TODO: needs rework to match netconf framer API. - public void write(ByteBuf message) throws IOException { + public void write(final ByteBuf message) throws IOException { synchronized (lock) { if (stdIn == null) { postponed.add(message); @@ -110,25 +103,36 @@ public class SshClientAdapter implements Runnable { } } - private void writeImpl(ByteBuf message) throws IOException { + private void writeImpl(final ByteBuf message) throws IOException { message.getBytes(0, stdIn, message.readableBytes()); message.release(); stdIn.flush(); } - public void stop(ChannelPromise promise) { + public void stop(final ChannelPromise promise) { synchronized (lock) { stopRequested.set(true); disconnectPromise = promise; } } - public void start(ChannelHandlerContext ctx) { - if (this.ctx != null) { - // context is already associated. - return; + public Thread start(final ChannelHandlerContext ctx, final ChannelFuture channelFuture) { + checkArgument(channelFuture.isSuccess()); + checkNotNull(ctx.channel().remoteAddress()); + synchronized (this) { + checkState(this.ctx == null); + this.ctx = ctx; } - this.ctx = ctx; - new Thread(this).start(); + final String threadName = toString(); + final Thread thread = new Thread(this, threadName); + thread.start(); + return thread; + } + + @Override + public String toString() { + return "SshClientAdapter{" + + "sshClient=" + sshClient + + '}'; } }