2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
11 import io.netty.buffer.ByteBuf;
12 import io.netty.buffer.Unpooled;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelPromise;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.OutputStream;
18 import java.util.LinkedList;
19 import java.util.Queue;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket.VirtualSocketException;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
27 * Worker thread class. Handles all downstream and upstream events in SSH Netty
30 public class SshClientAdapter implements Runnable {
31 private static final Logger logger = LoggerFactory.getLogger(SshClientAdapter.class);
33 private static final int BUFFER_SIZE = 1024;
35 private final SshClient sshClient;
36 private final Invoker invoker;
38 private OutputStream stdIn;
40 private Queue<ByteBuf> postponed = new LinkedList<>();
42 private ChannelHandlerContext ctx;
43 private ChannelPromise disconnectPromise;
45 private final AtomicBoolean stopRequested = new AtomicBoolean(false);
47 private final Object lock = new Object();
49 public SshClientAdapter(SshClient sshClient, Invoker invoker) {
50 this.sshClient = sshClient;
51 this.invoker = invoker;
56 SshSession session = sshClient.openSession();
57 invoker.invoke(session);
58 InputStream stdOut = session.getStdout();
63 stdIn = session.getStdin();
65 while ((message = postponed.poll()) != null) {
70 while (!stopRequested.get()) {
71 byte[] readBuff = new byte[BUFFER_SIZE];
72 int c = stdOut.read(readBuff);
76 byte[] tranBuff = new byte[c];
77 System.arraycopy(readBuff, 0, tranBuff, 0, c);
79 ByteBuf byteBuf = Unpooled.buffer(c);
80 byteBuf.writeBytes(tranBuff);
81 ctx.fireChannelRead(byteBuf);
84 } catch (VirtualSocketException e) {
85 // Netty closed connection prematurely.
86 // Or maybe tried to open ganymed connection without having initialized session
87 // (ctx.channel().remoteAddress() is null)
88 // Just pass and move on.
89 } catch (Exception e) {
90 logger.error("Unexpected exception", e);
95 if (disconnectPromise != null) {
96 ctx.disconnect(disconnectPromise);
102 // TODO: needs rework to match netconf framer API.
103 public void write(ByteBuf message) throws IOException {
104 synchronized (lock) {
106 postponed.add(message);
113 private void writeImpl(ByteBuf message) throws IOException {
114 message.getBytes(0, stdIn, message.readableBytes());
119 public void stop(ChannelPromise promise) {
120 synchronized (lock) {
121 stopRequested.set(true);
122 disconnectPromise = promise;
126 public void start(ChannelHandlerContext ctx) {
127 if (this.ctx != null) {
128 // context is already associated.
132 new Thread(this).start();