Merge "Added DELETE support for Bridge and Port resources"
[controller.git] / opendaylight / netconf / netconf-util / src / main / java / org / opendaylight / controller / netconf / util / handler / ssh / client / SshClientAdapter.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.netconf.util.handler.ssh.client;
10
11 import io.netty.buffer.ByteBuf;
12 import io.netty.buffer.Unpooled;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelPromise;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.io.OutputStream;
18 import java.util.concurrent.atomic.AtomicBoolean;
19 import org.opendaylight.controller.netconf.util.handler.ssh.virtualsocket.VirtualSocketException;
20
21 /**
22  * Worker thread class. Handles all downstream and upstream events in SSH Netty pipeline.
23  */
24 public class SshClientAdapter implements Runnable {
25     private final SshClient sshClient;
26     private final Invoker invoker;
27
28     private SshSession session;
29     private InputStream stdOut;
30     private InputStream stdErr;
31     private OutputStream stdIn;
32
33     private ChannelHandlerContext ctx;
34     private ChannelPromise disconnectPromise;
35
36     private final AtomicBoolean stopRequested = new AtomicBoolean(false);
37
38     private final Object lock = new Object();
39
40     public SshClientAdapter(SshClient sshClient,
41                             Invoker invoker) {
42         this.sshClient = sshClient;
43         this.invoker = invoker;
44     }
45
46     public void run() {
47         try {
48             session = sshClient.openSession();
49             invoker.invoke(session);
50
51             stdOut = session.getStdout();
52             stdErr = session.getStderr();
53
54             synchronized(lock) {
55                 stdIn = session.getStdin();
56             }
57
58             while (stopRequested.get() == false) {
59                 byte[] readBuff = new byte[1024];
60                 int c = stdOut.read(readBuff);
61
62                 byte[] tranBuff = new byte[c];
63                 System.arraycopy(readBuff, 0, tranBuff, 0, c);
64
65                 ByteBuf byteBuf = Unpooled.buffer(c);
66                 byteBuf.writeBytes(tranBuff);
67                 ctx.fireChannelRead(byteBuf);
68             }
69
70         } catch (VirtualSocketException e) {
71             // Netty closed connection prematurely.
72             // Just pass and move on.
73         } catch (Exception e) {
74             throw new RuntimeException(e);
75         } finally {
76             sshClient.close();
77
78             synchronized (lock) {
79                 if(disconnectPromise != null) ctx.disconnect(disconnectPromise);
80             }
81         }
82     }
83
84     // TODO: needs rework to match netconf framer API.
85     public void write(String message) throws IOException {
86         synchronized (lock) {
87             if (stdIn == null) throw new IllegalStateException("StdIn not available");
88         }
89         stdIn.write(message.getBytes());
90         stdIn.flush();
91     }
92
93     public void stop(ChannelPromise promise) {
94         synchronized (lock) {
95             stopRequested.set(true);
96             disconnectPromise = promise;
97         }
98     }
99
100     public void start(ChannelHandlerContext ctx) {
101         if(this.ctx != null) return; // context is already associated.
102
103         this.ctx = ctx;
104         new Thread(this).start();
105     }
106 }