HoneyNode Java 11 support for 221 devices
[transportpce.git] / tests / honeynode / 2.1 / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandler.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
10
11 import com.google.common.base.Preconditions;
12 import io.netty.buffer.ByteBuf;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelOutboundHandlerAdapter;
15 import io.netty.channel.ChannelPromise;
16 import io.netty.util.concurrent.Future;
17 import io.netty.util.concurrent.GenericFutureListener;
18 import java.io.IOException;
19 import java.net.SocketAddress;
20 import java.util.HashMap;
21 import java.util.Map;
22 import org.apache.sshd.ClientChannel;
23 import org.apache.sshd.ClientSession;
24 import org.apache.sshd.SshClient;
25 import org.apache.sshd.client.future.AuthFuture;
26 import org.apache.sshd.client.future.ConnectFuture;
27 import org.apache.sshd.client.future.OpenFuture;
28 import org.apache.sshd.common.future.CloseFuture;
29 import org.apache.sshd.common.future.SshFutureListener;
30 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * Netty SSH handler class. Acts as interface between Netty and SSH library.
36  */
37 public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
38     private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandler.class);
39
40     public static final String SUBSYSTEM = "netconf";
41
42     public static final int SSH_DEFAULT_NIO_WORKERS = 8;
43     // Disable default timeouts from mina sshd
44     private static final long DEFAULT_TIMEOUT = -1L;
45
46     public static final SshClient DEFAULT_CLIENT;
47
48     static {
49         final Map<String, String> props = new HashMap<>();
50         props.put(SshClient.AUTH_TIMEOUT, Long.toString(DEFAULT_TIMEOUT));
51         props.put(SshClient.IDLE_TIMEOUT, Long.toString(DEFAULT_TIMEOUT));
52
53         final SshClient c = SshClient.setUpDefaultClient();
54
55         c.setProperties(props);
56         // TODO make configurable, or somehow reuse netty threadpool
57         c.setNioWorkers(SSH_DEFAULT_NIO_WORKERS);
58         c.start();
59         DEFAULT_CLIENT = c;
60     }
61
62     private final AuthenticationHandler authenticationHandler;
63     private final SshClient sshClient;
64     private Future<?> negotiationFuture;
65
66     private AsyncSshHandlerReader sshReadAsyncListener;
67     private AsyncSshHandlerWriter sshWriteAsyncHandler;
68
69     private ClientChannel channel;
70     private ClientSession session;
71     private ChannelPromise connectPromise;
72     private GenericFutureListener negotiationFutureListener;
73
74     public AsyncSshHandler(final AuthenticationHandler authenticationHandler, final SshClient sshClient,
75             final Future<?> negotiationFuture) throws IOException {
76         this(authenticationHandler, sshClient);
77         this.negotiationFuture = negotiationFuture;
78     }
79
80     /**
81      * Constructor of {@code AsyncSshHandler}.
82      *
83      * @param authenticationHandler authentication handler
84      * @param sshClient             started SshClient
85      * @throws IOException          if the I/O operation fails
86      */
87     public AsyncSshHandler(final AuthenticationHandler authenticationHandler,
88                            final SshClient sshClient) throws IOException {
89         this.authenticationHandler = Preconditions.checkNotNull(authenticationHandler);
90         this.sshClient = Preconditions.checkNotNull(sshClient);
91     }
92
93     public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler)
94             throws IOException {
95         return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT);
96     }
97
98     /**
99      * Create AsyncSshHandler for netconf subsystem. Negotiation future has to be set to success after successful
100      * netconf negotiation.
101      *
102      * @param authenticationHandler authentication handler
103      * @param negotiationFuture     negotiation future
104      * @return                      {@code AsyncSshHandler}
105      * @throws IOException          if the I/O operation fails
106      */
107     public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler,
108             final Future<?> negotiationFuture) throws IOException {
109         return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT, negotiationFuture);
110     }
111
112     private void startSsh(final ChannelHandlerContext ctx, final SocketAddress address) {
113         LOG.debug("Starting SSH to {} on channel: {}", address, ctx.channel());
114
115         final ConnectFuture sshConnectionFuture = sshClient.connect(authenticationHandler.getUsername(), address);
116         sshConnectionFuture.addListener(new SshFutureListener<ConnectFuture>() {
117             @Override
118             public void operationComplete(final ConnectFuture future) {
119                 if (future.isConnected()) {
120                     handleSshSessionCreated(future, ctx);
121                 } else {
122                     handleSshSetupFailure(ctx, future.getException());
123                 }
124             }
125         });
126     }
127
128     private synchronized void handleSshSessionCreated(final ConnectFuture future, final ChannelHandlerContext ctx) {
129         try {
130             LOG.trace("SSH session created on channel: {}", ctx.channel());
131
132             session = future.getSession();
133             final AuthFuture authenticateFuture = authenticationHandler.authenticate(session);
134             authenticateFuture.addListener(new SshFutureListener<AuthFuture>() {
135                 @Override
136                 public void operationComplete(final AuthFuture future) {
137                     if (future.isSuccess()) {
138                         handleSshAuthenticated(session, ctx);
139                     } else {
140                         // Exception does not have to be set in the future, add simple exception in such case
141                         final Throwable exception = future.getException() == null
142                                 ? new IllegalStateException("Authentication failed") : future.getException();
143                         handleSshSetupFailure(ctx, exception);
144                     }
145                 }
146             });
147         } catch (final IOException e) {
148             handleSshSetupFailure(ctx, e);
149         }
150     }
151
152     private synchronized void handleSshAuthenticated(final ClientSession session, final ChannelHandlerContext ctx) {
153         try {
154             LOG.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(),
155                     session.getServerVersion());
156
157             channel = session.createSubsystemChannel(SUBSYSTEM);
158             channel.setStreaming(ClientChannel.Streaming.Async);
159             channel.open().addListener(new SshFutureListener<OpenFuture>() {
160                 @Override
161                 public void operationComplete(final OpenFuture future) {
162                     if (future.isOpened()) {
163                         handleSshChanelOpened(ctx);
164                     } else {
165                         handleSshSetupFailure(ctx, future.getException());
166                     }
167                 }
168             });
169
170
171         } catch (final IOException e) {
172             handleSshSetupFailure(ctx, e);
173         }
174     }
175
176     private synchronized void handleSshChanelOpened(final ChannelHandlerContext ctx) {
177         LOG.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel());
178
179         if (negotiationFuture == null) {
180             connectPromise.setSuccess();
181         }
182
183         // TODO we should also read from error stream and at least log from that
184
185         sshReadAsyncListener = new AsyncSshHandlerReader(new AutoCloseable() {
186             @Override
187             public void close() throws Exception {
188                 AsyncSshHandler.this.disconnect(ctx, ctx.newPromise());
189             }
190         }, new AsyncSshHandlerReader.ReadMsgHandler() {
191             @Override
192             public void onMessageRead(final ByteBuf msg) {
193                 ctx.fireChannelRead(msg);
194             }
195         }, channel.toString(), channel.getAsyncOut());
196
197         // if readAsyncListener receives immediate close,
198         // it will close this handler and closing this handler sets channel variable to null
199         if (channel != null) {
200             sshWriteAsyncHandler = new AsyncSshHandlerWriter(channel.getAsyncIn());
201             ctx.fireChannelActive();
202         }
203     }
204
205     private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable error) {
206         LOG.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), error);
207
208         // If the promise is not yet done, we have failed with initial connect and set connectPromise to failure
209         if (!connectPromise.isDone()) {
210             connectPromise.setFailure(error);
211         }
212
213         disconnect(ctx, ctx.newPromise());
214     }
215
216     @Override
217     public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
218         sshWriteAsyncHandler.write(ctx, msg, promise);
219     }
220
221     @Override
222     public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress,
223                                      final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
224         LOG.debug("SSH session connecting on channel {}. promise: {} ", ctx.channel(), connectPromise);
225         this.connectPromise = promise;
226
227         if (negotiationFuture != null) {
228
229             negotiationFutureListener = new GenericFutureListener<Future<?>>() {
230                 @Override
231                 public void operationComplete(final Future<?> future) {
232                     if (future.isSuccess()) {
233                         connectPromise.setSuccess();
234                     }
235                 }
236             };
237             //complete connection promise with netconf negotiation future
238             negotiationFuture.addListener(negotiationFutureListener);
239         }
240         startSsh(ctx, remoteAddress);
241     }
242
243     @Override
244     public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
245         disconnect(ctx, promise);
246     }
247
248     @SuppressWarnings("checkstyle:IllegalCatch")
249     @Override
250     public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
251         LOG.trace("Closing SSH session on channel: {} with connect promise in state: {}",
252                 ctx.channel(),connectPromise);
253
254         // If we have already succeeded and the session was dropped after,
255         // we need to fire inactive to notify reconnect logic
256         if (connectPromise.isSuccess()) {
257             ctx.fireChannelInactive();
258         }
259
260         if (sshWriteAsyncHandler != null) {
261             sshWriteAsyncHandler.close();
262         }
263
264         if (sshReadAsyncListener != null) {
265             sshReadAsyncListener.close();
266         }
267
268         //If connection promise is not already set, it means negotiation failed
269         //we must set connection promise to failure
270         if (!connectPromise.isDone()) {
271             connectPromise.setFailure(new IllegalStateException("Negotiation failed"));
272         }
273
274         //Remove listener from negotiation future, we don't want notifications
275         //from negotiation anymore
276         if (negotiationFuture != null) {
277             negotiationFuture.removeListener(negotiationFutureListener);
278         }
279
280         if (session != null && !session.isClosed() && !session.isClosing()) {
281             session.close(false).addListener(new SshFutureListener<CloseFuture>() {
282                 @Override
283                 public void operationComplete(final CloseFuture future) {
284                     if (future.isClosed() == false) {
285                         session.close(true);
286                     }
287                     session = null;
288                 }
289             });
290         }
291
292         // Super disconnect is necessary in this case since we are using NioSocketChannel and it needs
293         // to cleanup its resources e.g. Socket that it tries to open in its constructor
294         // (https://bugs.opendaylight.org/show_bug.cgi?id=2430)
295         // TODO better solution would be to implement custom ChannelFactory + Channel
296         // that will use mina SSH lib internally: port this to custom channel implementation
297         try {
298             // Disconnect has to be closed after inactive channel event was fired, because it interferes with it
299             super.disconnect(ctx, ctx.newPromise());
300         } catch (final Exception e) {
301             LOG.warn("Unable to cleanup all resources for channel: {}. Ignoring.", ctx.channel(), e);
302         }
303
304         channel = null;
305         promise.setSuccess();
306         LOG.debug("SSH session closed on channel: {}", ctx.channel());
307     }
308
309 }