Bug 1586: Do not use JaxRS 2.0 unnecessarily
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / SshClientAdapter.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
10
11 import static com.google.common.base.Preconditions.checkArgument;
12 import static com.google.common.base.Preconditions.checkNotNull;
13 import static com.google.common.base.Preconditions.checkState;
14
15 import io.netty.buffer.ByteBuf;
16 import io.netty.buffer.Unpooled;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelHandlerContext;
19 import io.netty.channel.ChannelPromise;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.OutputStream;
23 import java.util.LinkedList;
24 import java.util.Queue;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29
30 /**
31  * Worker thread class. Handles all downstream and upstream events in SSH Netty
32  * pipeline.
33  */
34 class SshClientAdapter implements Runnable {
35     private static final Logger logger = LoggerFactory.getLogger(SshClientAdapter.class);
36
37     private static final int BUFFER_SIZE = 1024;
38
39     private final SshClient sshClient;
40     private final Invoker invoker;
41
42     private OutputStream stdIn;
43
44     private Queue<ByteBuf> postponed = new LinkedList<>();
45
46     private ChannelHandlerContext ctx;
47     private ChannelPromise disconnectPromise;
48
49     private final AtomicBoolean stopRequested = new AtomicBoolean(false);
50
51     private final Object lock = new Object();
52
53     public SshClientAdapter(SshClient sshClient, Invoker invoker) {
54         this.sshClient = sshClient;
55         this.invoker = invoker;
56     }
57
58     // TODO: refactor
59     public void run() {
60         try {
61             SshSession session = sshClient.openSession();
62             invoker.invoke(session);
63             InputStream stdOut = session.getStdout();
64             session.getStderr();
65
66             synchronized (lock) {
67
68                 stdIn = session.getStdin();
69                 ByteBuf message;
70                 while ((message = postponed.poll()) != null) {
71                     writeImpl(message);
72                 }
73             }
74
75             while (!stopRequested.get()) {
76                 byte[] readBuff = new byte[BUFFER_SIZE];
77                 int c = stdOut.read(readBuff);
78                 if (c == -1) {
79                     continue;
80                 }
81                 byte[] tranBuff = new byte[c];
82                 System.arraycopy(readBuff, 0, tranBuff, 0, c);
83
84                 ByteBuf byteBuf = Unpooled.buffer(c);
85                 byteBuf.writeBytes(tranBuff);
86                 ctx.fireChannelRead(byteBuf);
87             }
88         } catch (Exception e) {
89             logger.error("Unexpected exception", e);
90         } finally {
91             sshClient.close();
92
93             synchronized (lock) {
94                 if (disconnectPromise != null) {
95                     ctx.disconnect(disconnectPromise);
96                 }
97             }
98         }
99     }
100
101     // TODO: needs rework to match netconf framer API.
102     public void write(ByteBuf message) throws IOException {
103         synchronized (lock) {
104             if (stdIn == null) {
105                 postponed.add(message);
106                 return;
107             }
108             writeImpl(message);
109         }
110     }
111
112     private void writeImpl(ByteBuf message) throws IOException {
113         message.getBytes(0, stdIn, message.readableBytes());
114         message.release();
115         stdIn.flush();
116     }
117
118     public void stop(ChannelPromise promise) {
119         synchronized (lock) {
120             stopRequested.set(true);
121             disconnectPromise = promise;
122         }
123     }
124
125     public Thread start(ChannelHandlerContext ctx, ChannelFuture channelFuture) {
126         checkArgument(channelFuture.isSuccess());
127         checkNotNull(ctx.channel().remoteAddress());
128         synchronized (this) {
129             checkState(this.ctx == null);
130             this.ctx = ctx;
131         }
132         String threadName = toString();
133         Thread thread = new Thread(this, threadName);
134         thread.start();
135         return thread;
136     }
137
138     @Override
139     public String toString() {
140         return "SshClientAdapter{" +
141                 "sshClient=" + sshClient +
142                 '}';
143     }
144 }