Merge "BUG 1179 - rpcResult = false but response 200"
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / SshClientAdapter.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
10
11 import io.netty.buffer.ByteBuf;
12 import io.netty.buffer.Unpooled;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelPromise;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.OutputStream;
18 import java.util.LinkedList;
19 import java.util.Queue;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket.VirtualSocketException;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25
26 /**
27  * Worker thread class. Handles all downstream and upstream events in SSH Netty
28  * pipeline.
29  */
30 public class SshClientAdapter implements Runnable {
31     private static final Logger logger = LoggerFactory.getLogger(SshClientAdapter.class);
32
33     private static final int BUFFER_SIZE = 1024;
34
35     private final SshClient sshClient;
36     private final Invoker invoker;
37
38     private OutputStream stdIn;
39
40     private Queue<ByteBuf> postponed = new LinkedList<>();
41
42     private ChannelHandlerContext ctx;
43     private ChannelPromise disconnectPromise;
44
45     private final AtomicBoolean stopRequested = new AtomicBoolean(false);
46
47     private final Object lock = new Object();
48
49     public SshClientAdapter(SshClient sshClient, Invoker invoker) {
50         this.sshClient = sshClient;
51         this.invoker = invoker;
52     }
53
54     public void run() {
55         SshSession session;
56         try {
57             session = sshClient.openSession();
58         } catch (IOException e) {
59             logger.error("Cannot establish session", e);
60             sshClient.close();
61             return;
62         }
63         try {
64             invoker.invoke(session);
65             InputStream stdOut = session.getStdout();
66             session.getStderr();
67
68             synchronized (lock) {
69
70                 stdIn = session.getStdin();
71                 ByteBuf message;
72                 while ((message = postponed.poll()) != null) {
73                     writeImpl(message);
74                 }
75             }
76
77             while (!stopRequested.get()) {
78                 byte[] readBuff = new byte[BUFFER_SIZE];
79                 int c = stdOut.read(readBuff);
80                 if (c == -1) {
81                     continue;
82                 }
83                 byte[] tranBuff = new byte[c];
84                 System.arraycopy(readBuff, 0, tranBuff, 0, c);
85
86                 ByteBuf byteBuf = Unpooled.buffer(c);
87                 byteBuf.writeBytes(tranBuff);
88                 ctx.fireChannelRead(byteBuf);
89             }
90
91         } catch (VirtualSocketException e) {
92             // Netty closed connection prematurely.
93             // Just pass and move on.
94         } catch (Exception e) {
95             logger.error("Unexpected exception", e);
96         } finally {
97             sshClient.close();
98
99             synchronized (lock) {
100                 if (disconnectPromise != null) {
101                     ctx.disconnect(disconnectPromise);
102                 }
103             }
104         }
105     }
106
107     // TODO: needs rework to match netconf framer API.
108     public void write(ByteBuf message) throws IOException {
109         synchronized (lock) {
110             if (stdIn == null) {
111                 postponed.add(message);
112                 return;
113             }
114             writeImpl(message);
115         }
116     }
117
118     private void writeImpl(ByteBuf message) throws IOException {
119         message.getBytes(0, stdIn, message.readableBytes());
120         stdIn.flush();
121     }
122
123     public void stop(ChannelPromise promise) {
124         synchronized (lock) {
125             stopRequested.set(true);
126             disconnectPromise = promise;
127         }
128     }
129
130     public void start(ChannelHandlerContext ctx) {
131         if (this.ctx != null) {
132             // context is already associated.
133             return;
134         }
135         this.ctx = ctx;
136         new Thread(this).start();
137     }
138 }