2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
11 import io.netty.buffer.ByteBuf;
12 import io.netty.buffer.Unpooled;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelPromise;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.OutputStream;
18 import java.util.LinkedList;
19 import java.util.Queue;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket.VirtualSocketException;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
27 * Worker thread class. Handles all downstream and upstream events in SSH Netty
30 public class SshClientAdapter implements Runnable {
31 private static final Logger logger = LoggerFactory.getLogger(SshClientAdapter.class);
33 private static final int BUFFER_SIZE = 1024;
35 private final SshClient sshClient;
36 private final Invoker invoker;
38 private OutputStream stdIn;
40 private Queue<ByteBuf> postponed = new LinkedList<>();
42 private ChannelHandlerContext ctx;
43 private ChannelPromise disconnectPromise;
45 private final AtomicBoolean stopRequested = new AtomicBoolean(false);
47 private final Object lock = new Object();
49 public SshClientAdapter(SshClient sshClient, Invoker invoker) {
50 this.sshClient = sshClient;
51 this.invoker = invoker;
57 session = sshClient.openSession();
58 } catch (IOException e) {
59 logger.error("Cannot establish session", e);
64 invoker.invoke(session);
65 InputStream stdOut = session.getStdout();
70 stdIn = session.getStdin();
72 while ((message = postponed.poll()) != null) {
77 while (!stopRequested.get()) {
78 byte[] readBuff = new byte[BUFFER_SIZE];
79 int c = stdOut.read(readBuff);
83 byte[] tranBuff = new byte[c];
84 System.arraycopy(readBuff, 0, tranBuff, 0, c);
86 ByteBuf byteBuf = Unpooled.buffer(c);
87 byteBuf.writeBytes(tranBuff);
88 ctx.fireChannelRead(byteBuf);
91 } catch (VirtualSocketException e) {
92 // Netty closed connection prematurely.
93 // Just pass and move on.
94 } catch (Exception e) {
95 logger.error("Unexpected exception", e);
100 if (disconnectPromise != null) {
101 ctx.disconnect(disconnectPromise);
107 // TODO: needs rework to match netconf framer API.
108 public void write(ByteBuf message) throws IOException {
109 synchronized (lock) {
111 postponed.add(message);
118 private void writeImpl(ByteBuf message) throws IOException {
119 message.getBytes(0, stdIn, message.readableBytes());
123 public void stop(ChannelPromise promise) {
124 synchronized (lock) {
125 stopRequested.set(true);
126 disconnectPromise = promise;
130 public void start(ChannelHandlerContext ctx) {
131 if (this.ctx != null) {
132 // context is already associated.
136 new Thread(this).start();