2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
11 import static com.google.common.base.Preconditions.checkArgument;
12 import static com.google.common.base.Preconditions.checkNotNull;
13 import static com.google.common.base.Preconditions.checkState;
15 import io.netty.buffer.ByteBuf;
16 import io.netty.buffer.Unpooled;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelHandlerContext;
19 import io.netty.channel.ChannelPromise;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.OutputStream;
23 import java.util.LinkedList;
24 import java.util.Queue;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
31 * Worker thread class. Handles all downstream and upstream events in SSH Netty
34 class SshClientAdapter implements Runnable {
35 private static final Logger logger = LoggerFactory.getLogger(SshClientAdapter.class);
37 private static final int BUFFER_SIZE = 1024;
39 private final SshClient sshClient;
40 private final Invoker invoker;
42 private OutputStream stdIn;
44 private Queue<ByteBuf> postponed = new LinkedList<>();
46 private ChannelHandlerContext ctx;
47 private ChannelPromise disconnectPromise;
49 private final AtomicBoolean stopRequested = new AtomicBoolean(false);
51 private final Object lock = new Object();
53 public SshClientAdapter(SshClient sshClient, Invoker invoker) {
54 this.sshClient = sshClient;
55 this.invoker = invoker;
61 SshSession session = sshClient.openSession();
62 invoker.invoke(session);
63 InputStream stdOut = session.getStdout();
68 stdIn = session.getStdin();
70 while ((message = postponed.poll()) != null) {
75 while (!stopRequested.get()) {
76 byte[] readBuff = new byte[BUFFER_SIZE];
77 int c = stdOut.read(readBuff);
81 byte[] tranBuff = new byte[c];
82 System.arraycopy(readBuff, 0, tranBuff, 0, c);
84 ByteBuf byteBuf = Unpooled.buffer(c);
85 byteBuf.writeBytes(tranBuff);
86 ctx.fireChannelRead(byteBuf);
88 } catch (Exception e) {
89 logger.error("Unexpected exception", e);
94 if (disconnectPromise != null) {
95 ctx.disconnect(disconnectPromise);
101 // TODO: needs rework to match netconf framer API.
102 public void write(ByteBuf message) throws IOException {
103 synchronized (lock) {
105 postponed.add(message);
112 private void writeImpl(ByteBuf message) throws IOException {
113 message.getBytes(0, stdIn, message.readableBytes());
118 public void stop(ChannelPromise promise) {
119 synchronized (lock) {
120 stopRequested.set(true);
121 disconnectPromise = promise;
125 public Thread start(ChannelHandlerContext ctx, ChannelFuture channelFuture) {
126 checkArgument(channelFuture.isSuccess());
127 checkNotNull(ctx.channel().remoteAddress());
128 synchronized (this) {
129 checkState(this.ctx == null);
132 String threadName = toString();
133 Thread thread = new Thread(this, threadName);
139 public String toString() {
140 return "SshClientAdapter{" +
141 "sshClient=" + sshClient +