2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.util.handler.ssh.client;
11 import io.netty.buffer.ByteBuf;
12 import io.netty.buffer.Unpooled;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelPromise;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.LinkedList;
20 import java.util.Queue;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import org.opendaylight.controller.netconf.util.handler.ssh.virtualsocket.VirtualSocketException;
26 * Worker thread class. Handles all downstream and upstream events in SSH Netty
29 public class SshClientAdapter implements Runnable {
30 private static final int BUFFER_SIZE = 1024;
32 private final SshClient sshClient;
33 private final Invoker invoker;
35 private OutputStream stdIn;
37 private Queue<ByteBuf> postponed = new LinkedList<>();
39 private ChannelHandlerContext ctx;
40 private ChannelPromise disconnectPromise;
42 private final AtomicBoolean stopRequested = new AtomicBoolean(false);
44 private final Object lock = new Object();
46 public SshClientAdapter(SshClient sshClient, Invoker invoker) {
47 this.sshClient = sshClient;
48 this.invoker = invoker;
53 SshSession session = sshClient.openSession();
54 invoker.invoke(session);
55 InputStream stdOut = session.getStdout();
60 stdIn = session.getStdin();
62 while ((message = postponed.poll()) != null) {
67 while (!stopRequested.get()) {
68 byte[] readBuff = new byte[BUFFER_SIZE];
69 int c = stdOut.read(readBuff);
73 byte[] tranBuff = new byte[c];
74 System.arraycopy(readBuff, 0, tranBuff, 0, c);
76 ByteBuf byteBuf = Unpooled.buffer(c);
77 byteBuf.writeBytes(tranBuff);
78 ctx.fireChannelRead(byteBuf);
81 } catch (VirtualSocketException e) {
82 // Netty closed connection prematurely.
83 // Just pass and move on.
84 } catch (Exception e) {
85 throw new IllegalStateException(e);
90 if (disconnectPromise != null) {
91 ctx.disconnect(disconnectPromise);
97 // TODO: needs rework to match netconf framer API.
98 public void write(ByteBuf message) throws IOException {
101 postponed.add(message);
108 private void writeImpl(ByteBuf message) throws IOException {
109 message.getBytes(0, stdIn, message.readableBytes());
113 public void stop(ChannelPromise promise) {
114 synchronized (lock) {
115 stopRequested.set(true);
116 disconnectPromise = promise;
120 public void start(ChannelHandlerContext ctx) {
121 if (this.ctx != null) {
122 // context is already associated.
126 new Thread(this).start();