Bug 1362: New AsyncWriteTransaction#submit method
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / SshClientAdapter.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
10
11 import io.netty.buffer.ByteBuf;
12 import io.netty.buffer.Unpooled;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelPromise;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.OutputStream;
18 import java.util.LinkedList;
19 import java.util.Queue;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import org.opendaylight.controller.netconf.nettyutil.handler.ssh.virtualsocket.VirtualSocketException;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25
26 /**
27  * Worker thread class. Handles all downstream and upstream events in SSH Netty
28  * pipeline.
29  */
30 public class SshClientAdapter implements Runnable {
31     private static final Logger logger = LoggerFactory.getLogger(SshClientAdapter.class);
32
33     private static final int BUFFER_SIZE = 1024;
34
35     private final SshClient sshClient;
36     private final Invoker invoker;
37
38     private OutputStream stdIn;
39
40     private Queue<ByteBuf> postponed = new LinkedList<>();
41
42     private ChannelHandlerContext ctx;
43     private ChannelPromise disconnectPromise;
44
45     private final AtomicBoolean stopRequested = new AtomicBoolean(false);
46
47     private final Object lock = new Object();
48
49     public SshClientAdapter(SshClient sshClient, Invoker invoker) {
50         this.sshClient = sshClient;
51         this.invoker = invoker;
52     }
53
54     public void run() {
55         try {
56             SshSession session = sshClient.openSession();
57             invoker.invoke(session);
58             InputStream stdOut = session.getStdout();
59             session.getStderr();
60
61             synchronized (lock) {
62
63                 stdIn = session.getStdin();
64                 ByteBuf message;
65                 while ((message = postponed.poll()) != null) {
66                     writeImpl(message);
67                 }
68             }
69
70             while (!stopRequested.get()) {
71                 byte[] readBuff = new byte[BUFFER_SIZE];
72                 int c = stdOut.read(readBuff);
73                 if (c == -1) {
74                     continue;
75                 }
76                 byte[] tranBuff = new byte[c];
77                 System.arraycopy(readBuff, 0, tranBuff, 0, c);
78
79                 ByteBuf byteBuf = Unpooled.buffer(c);
80                 byteBuf.writeBytes(tranBuff);
81                 ctx.fireChannelRead(byteBuf);
82             }
83
84         } catch (VirtualSocketException e) {
85             // Netty closed connection prematurely.
86             // Or maybe tried to open ganymed connection without having initialized session
87             // (ctx.channel().remoteAddress() is null)
88             // Just pass and move on.
89         } catch (Exception e) {
90             logger.error("Unexpected exception", e);
91         } finally {
92             sshClient.close();
93
94             synchronized (lock) {
95                 if (disconnectPromise != null) {
96                     ctx.disconnect(disconnectPromise);
97                 }
98             }
99         }
100     }
101
102     // TODO: needs rework to match netconf framer API.
103     public void write(ByteBuf message) throws IOException {
104         synchronized (lock) {
105             if (stdIn == null) {
106                 postponed.add(message);
107                 return;
108             }
109             writeImpl(message);
110         }
111     }
112
113     private void writeImpl(ByteBuf message) throws IOException {
114         message.getBytes(0, stdIn, message.readableBytes());
115         message.release();
116         stdIn.flush();
117     }
118
119     public void stop(ChannelPromise promise) {
120         synchronized (lock) {
121             stopRequested.set(true);
122             disconnectPromise = promise;
123         }
124     }
125
126     public void start(ChannelHandlerContext ctx) {
127         if (this.ctx != null) {
128             // context is already associated.
129             return;
130         }
131         this.ctx = ctx;
132         new Thread(this).start();
133     }
134 }