Merge "Fixed typo in SnapshotBackedWriteTransaction class"
[controller.git] / opendaylight / netconf / netconf-util / src / main / java / org / opendaylight / controller / netconf / util / handler / ssh / client / SshClientAdapter.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.util.handler.ssh.client;
10
11 import io.netty.buffer.ByteBuf;
12 import io.netty.buffer.Unpooled;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelPromise;
15
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.OutputStream;
19 import java.util.LinkedList;
20 import java.util.Queue;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import org.opendaylight.controller.netconf.util.handler.ssh.virtualsocket.VirtualSocketException;
23
24
25 /**
26  * Worker thread class. Handles all downstream and upstream events in SSH Netty
27  * pipeline.
28  */
29 public class SshClientAdapter implements Runnable {
30     private static final int BUFFER_SIZE = 1024;
31
32     private final SshClient sshClient;
33     private final Invoker invoker;
34
35     private OutputStream stdIn;
36
37     private Queue<ByteBuf> postponed = new LinkedList<>();
38
39     private ChannelHandlerContext ctx;
40     private ChannelPromise disconnectPromise;
41
42     private final AtomicBoolean stopRequested = new AtomicBoolean(false);
43
44     private final Object lock = new Object();
45
46     public SshClientAdapter(SshClient sshClient, Invoker invoker) {
47         this.sshClient = sshClient;
48         this.invoker = invoker;
49     }
50
51     public void run() {
52         try {
53             SshSession session = sshClient.openSession();
54             invoker.invoke(session);
55             InputStream stdOut = session.getStdout();
56             session.getStderr();
57
58             synchronized (lock) {
59
60                 stdIn = session.getStdin();
61                 ByteBuf message;
62                 while ((message = postponed.poll()) != null) {
63                     writeImpl(message);
64                 }
65             }
66
67             while (!stopRequested.get()) {
68                 byte[] readBuff = new byte[BUFFER_SIZE];
69                 int c = stdOut.read(readBuff);
70                 if (c == -1) {
71                     continue;
72                 }
73                 byte[] tranBuff = new byte[c];
74                 System.arraycopy(readBuff, 0, tranBuff, 0, c);
75
76                 ByteBuf byteBuf = Unpooled.buffer(c);
77                 byteBuf.writeBytes(tranBuff);
78                 ctx.fireChannelRead(byteBuf);
79             }
80
81         } catch (VirtualSocketException e) {
82             // Netty closed connection prematurely.
83             // Just pass and move on.
84         } catch (Exception e) {
85             throw new IllegalStateException(e);
86         } finally {
87             sshClient.close();
88
89             synchronized (lock) {
90                 if (disconnectPromise != null) {
91                     ctx.disconnect(disconnectPromise);
92                 }
93             }
94         }
95     }
96
97     // TODO: needs rework to match netconf framer API.
98     public void write(ByteBuf message) throws IOException {
99         synchronized (lock) {
100             if (stdIn == null) {
101                 postponed.add(message);
102                 return;
103             }
104             writeImpl(message);
105         }
106     }
107
108     private void writeImpl(ByteBuf message) throws IOException {
109         message.getBytes(0, stdIn, message.readableBytes());
110         stdIn.flush();
111     }
112
113     public void stop(ChannelPromise promise) {
114         synchronized (lock) {
115             stopRequested.set(true);
116             disconnectPromise = promise;
117         }
118     }
119
120     public void start(ChannelHandlerContext ctx) {
121         if (this.ctx != null) {
122             // context is already associated.
123             return;
124         }
125         this.ctx = ctx;
126         new Thread(this).start();
127     }
128 }