Merge "Bug 1586: Do not use JaxRS 2.0 unnecessarily"
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / SshClientAdapter.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
10
11 import static com.google.common.base.Preconditions.checkArgument;
12 import static com.google.common.base.Preconditions.checkNotNull;
13 import static com.google.common.base.Preconditions.checkState;
14
15 import io.netty.buffer.ByteBuf;
16 import io.netty.buffer.Unpooled;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelHandlerContext;
19 import io.netty.channel.ChannelPromise;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.OutputStream;
23 import java.util.LinkedList;
24 import java.util.Queue;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29
30 /**
31  * Worker thread class. Handles all downstream and upstream events in SSH Netty
32  * pipeline.
33  */
34 class SshClientAdapter implements Runnable {
35     private static final Logger logger = LoggerFactory.getLogger(SshClientAdapter.class);
36
37     private static final int BUFFER_SIZE = 1024;
38
39     private final SshClient sshClient;
40     private final Invoker invoker;
41
42     private OutputStream stdIn;
43
44     private final Queue<ByteBuf> postponed = new LinkedList<>();
45
46     private ChannelHandlerContext ctx;
47     private ChannelPromise disconnectPromise;
48
49     private final AtomicBoolean stopRequested = new AtomicBoolean(false);
50
51     private final Object lock = new Object();
52
53     public SshClientAdapter(final SshClient sshClient, final Invoker invoker) {
54         this.sshClient = sshClient;
55         this.invoker = invoker;
56     }
57
58     // TODO ganymed spawns a Thread that receives the data from remote inside TransportManager
59     // Get rid of this thread and reuse Ganymed internal thread (not sure if its possible without modifications in ganymed)
60     public void run() {
61         try {
62             final SshSession session = sshClient.openSession();
63             invoker.invoke(session);
64             final InputStream stdOut = session.getStdout();
65
66             synchronized (lock) {
67                 stdIn = session.getStdin();
68                 while (postponed.peek() != null) {
69                     writeImpl(postponed.poll());
70                 }
71             }
72
73             while (!stopRequested.get()) {
74                 final byte[] readBuff = new byte[BUFFER_SIZE];
75                 final int c = stdOut.read(readBuff);
76                 if (c == -1) {
77                     continue;
78                 }
79
80                 ctx.fireChannelRead(Unpooled.copiedBuffer(readBuff, 0, c));
81             }
82         } catch (final Exception e) {
83             logger.error("Unexpected exception", e);
84         } finally {
85             sshClient.close();
86
87             synchronized (lock) {
88                 if (disconnectPromise != null) {
89                     ctx.disconnect(disconnectPromise);
90                 }
91             }
92         }
93     }
94
95     // TODO: needs rework to match netconf framer API.
96     public void write(final ByteBuf message) throws IOException {
97         synchronized (lock) {
98             if (stdIn == null) {
99                 postponed.add(message);
100                 return;
101             }
102             writeImpl(message);
103         }
104     }
105
106     private void writeImpl(final ByteBuf message) throws IOException {
107         message.getBytes(0, stdIn, message.readableBytes());
108         message.release();
109         stdIn.flush();
110     }
111
112     public void stop(final ChannelPromise promise) {
113         synchronized (lock) {
114             stopRequested.set(true);
115             disconnectPromise = promise;
116         }
117     }
118
119     public Thread start(final ChannelHandlerContext ctx, final ChannelFuture channelFuture) {
120         checkArgument(channelFuture.isSuccess());
121         checkNotNull(ctx.channel().remoteAddress());
122         synchronized (this) {
123             checkState(this.ctx == null);
124             this.ctx = ctx;
125         }
126         final String threadName = toString();
127         final Thread thread = new Thread(this, threadName);
128         thread.start();
129         return thread;
130     }
131
132     @Override
133     public String toString() {
134         return "SshClientAdapter{" +
135                 "sshClient=" + sshClient +
136                 '}';
137     }
138 }