2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.util.handler.ssh.client;
11 import io.netty.buffer.ByteBuf;
12 import io.netty.buffer.Unpooled;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelPromise;
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.LinkedList;
20 import java.util.Queue;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import org.opendaylight.controller.netconf.util.handler.ssh.virtualsocket.VirtualSocketException;
26 * Worker thread class. Handles all downstream and upstream events in SSH Netty
29 public class SshClientAdapter implements Runnable {
30 private final SshClient sshClient;
31 private final Invoker invoker;
33 private SshSession session;
34 private InputStream stdOut;
35 private InputStream stdErr;
36 private OutputStream stdIn;
38 private Queue<ByteBuf> postponned = new LinkedList<>();
41 private ChannelHandlerContext ctx;
42 private ChannelPromise disconnectPromise;
44 private final AtomicBoolean stopRequested = new AtomicBoolean(false);
46 private final Object lock = new Object();
48 public SshClientAdapter(SshClient sshClient, Invoker invoker) {
49 this.sshClient = sshClient;
50 this.invoker = invoker;
55 session = sshClient.openSession();
56 invoker.invoke(session);
57 stdOut = session.getStdout();
58 stdErr = session.getStderr();
62 stdIn = session.getStdin();
63 ByteBuf message = null;
64 while ((message = postponned.poll()) != null) {
69 while (stopRequested.get() == false) {
70 byte[] readBuff = new byte[1024];
71 int c = stdOut.read(readBuff);
75 byte[] tranBuff = new byte[c];
76 System.arraycopy(readBuff, 0, tranBuff, 0, c);
78 ByteBuf byteBuf = Unpooled.buffer(c);
79 byteBuf.writeBytes(tranBuff);
80 ctx.fireChannelRead(byteBuf);
83 } catch (VirtualSocketException e) {
84 // Netty closed connection prematurely.
85 // Just pass and move on.
86 } catch (Exception e) {
87 throw new RuntimeException(e);
92 if (disconnectPromise != null)
93 ctx.disconnect(disconnectPromise);
98 // TODO: needs rework to match netconf framer API.
99 public void write(ByteBuf message) throws IOException {
100 synchronized (lock) {
102 postponned.add(message);
109 private void writeImpl(ByteBuf message) throws IOException {
110 message.getBytes(0, stdIn, message.readableBytes());
114 public void stop(ChannelPromise promise) {
115 synchronized (lock) {
116 stopRequested.set(true);
117 disconnectPromise = promise;
121 public void start(ChannelHandlerContext ctx) {
122 if (this.ctx != null)
123 return; // context is already associated.
125 new Thread(this).start();