Merge "Choice and case resolving in JSON output"
[controller.git] / opendaylight / netconf / netconf-util / src / main / java / org / opendaylight / controller / netconf / util / handler / ssh / client / SshClientAdapter.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.util.handler.ssh.client;
10
11 import io.netty.buffer.ByteBuf;
12 import io.netty.buffer.Unpooled;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelPromise;
15
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.LinkedList;
20 import java.util.Queue;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import org.opendaylight.controller.netconf.util.handler.ssh.virtualsocket.VirtualSocketException;
23
24
25 /**
26  * Worker thread class. Handles all downstream and upstream events in SSH Netty
27  * pipeline.
28  */
29 public class SshClientAdapter implements Runnable {
30     private final SshClient sshClient;
31     private final Invoker invoker;
32
33     private SshSession session;
34     private InputStream stdOut;
35     private InputStream stdErr;
36     private OutputStream stdIn;
37
38     private Queue<ByteBuf> postponned = new LinkedList<>();
39
40
41     private ChannelHandlerContext ctx;
42     private ChannelPromise disconnectPromise;
43
44     private final AtomicBoolean stopRequested = new AtomicBoolean(false);
45
46     private final Object lock = new Object();
47
48     public SshClientAdapter(SshClient sshClient, Invoker invoker) {
49         this.sshClient = sshClient;
50         this.invoker = invoker;
51     }
52
53     public void run() {
54         try {
55             session = sshClient.openSession();
56             invoker.invoke(session);
57             stdOut = session.getStdout();
58             stdErr = session.getStderr();
59
60             synchronized (lock) {
61
62                 stdIn = session.getStdin();
63                 ByteBuf message = null;
64                 while ((message = postponned.poll()) != null) {
65                     writeImpl(message);
66                 }
67             }
68
69             while (stopRequested.get() == false) {
70                 byte[] readBuff = new byte[1024];
71                 int c = stdOut.read(readBuff);
72                 if (c == -1) {
73                     continue;
74                 }
75                 byte[] tranBuff = new byte[c];
76                 System.arraycopy(readBuff, 0, tranBuff, 0, c);
77
78                 ByteBuf byteBuf = Unpooled.buffer(c);
79                 byteBuf.writeBytes(tranBuff);
80                 ctx.fireChannelRead(byteBuf);
81             }
82
83         } catch (VirtualSocketException e) {
84             // Netty closed connection prematurely.
85             // Just pass and move on.
86         } catch (Exception e) {
87             throw new RuntimeException(e);
88         } finally {
89             sshClient.close();
90
91             synchronized (lock) {
92                 if (disconnectPromise != null)
93                     ctx.disconnect(disconnectPromise);
94             }
95         }
96     }
97
98     // TODO: needs rework to match netconf framer API.
99     public void write(ByteBuf message) throws IOException {
100         synchronized (lock) {
101             if (stdIn == null) {
102                 postponned.add(message);
103                 return;
104             }
105             writeImpl(message);
106         }
107     }
108
109     private void writeImpl(ByteBuf message) throws IOException {
110         message.getBytes(0, stdIn, message.readableBytes());
111         stdIn.flush();
112     }
113
114     public void stop(ChannelPromise promise) {
115         synchronized (lock) {
116             stopRequested.set(true);
117             disconnectPromise = promise;
118         }
119     }
120
121     public void start(ChannelHandlerContext ctx) {
122         if (this.ctx != null)
123             return; // context is already associated.
124         this.ctx = ctx;
125         new Thread(this).start();
126     }
127 }