2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
11 import static com.google.common.base.Preconditions.checkArgument;
12 import static com.google.common.base.Preconditions.checkNotNull;
13 import static com.google.common.base.Preconditions.checkState;
15 import io.netty.buffer.ByteBuf;
16 import io.netty.buffer.Unpooled;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelHandlerContext;
19 import io.netty.channel.ChannelPromise;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.OutputStream;
23 import java.util.LinkedList;
24 import java.util.Queue;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
31 * Worker thread class. Handles all downstream and upstream events in SSH Netty
34 class SshClientAdapter implements Runnable {
35 private static final Logger logger = LoggerFactory.getLogger(SshClientAdapter.class);
37 private static final int BUFFER_SIZE = 1024;
39 private final SshClient sshClient;
40 private final Invoker invoker;
42 private OutputStream stdIn;
44 private final Queue<ByteBuf> postponed = new LinkedList<>();
46 private ChannelHandlerContext ctx;
47 private ChannelPromise disconnectPromise;
49 private final AtomicBoolean stopRequested = new AtomicBoolean(false);
51 private final Object lock = new Object();
53 public SshClientAdapter(final SshClient sshClient, final Invoker invoker) {
54 this.sshClient = sshClient;
55 this.invoker = invoker;
58 // TODO ganymed spawns a Thread that receives the data from remote inside TransportManager
59 // Get rid of this thread and reuse Ganymed internal thread (not sure if its possible without modifications in ganymed)
62 final SshSession session = sshClient.openSession();
63 invoker.invoke(session);
64 final InputStream stdOut = session.getStdout();
67 stdIn = session.getStdin();
68 while (postponed.peek() != null) {
69 writeImpl(postponed.poll());
73 while (!stopRequested.get()) {
74 final byte[] readBuff = new byte[BUFFER_SIZE];
75 final int c = stdOut.read(readBuff);
80 ctx.fireChannelRead(Unpooled.copiedBuffer(readBuff, 0, c));
82 } catch (final Exception e) {
83 logger.error("Unexpected exception", e);
88 if (disconnectPromise != null) {
89 ctx.disconnect(disconnectPromise);
95 // TODO: needs rework to match netconf framer API.
96 public void write(final ByteBuf message) throws IOException {
99 postponed.add(message);
106 private void writeImpl(final ByteBuf message) throws IOException {
107 message.getBytes(0, stdIn, message.readableBytes());
112 public void stop(final ChannelPromise promise) {
113 synchronized (lock) {
114 stopRequested.set(true);
115 disconnectPromise = promise;
119 public Thread start(final ChannelHandlerContext ctx, final ChannelFuture channelFuture) {
120 checkArgument(channelFuture.isSuccess());
121 checkNotNull(ctx.channel().remoteAddress());
122 synchronized (this) {
123 checkState(this.ctx == null);
126 final String threadName = toString();
127 final Thread thread = new Thread(this, threadName);
133 public String toString() {
134 return "SshClientAdapter{" +
135 "sshClient=" + sshClient +