b3f085b2930d83507eebddde3e26e877400c964d
[netconf.git] / netconf / netconf-netty-util / src / test / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
9
10 import static org.mockito.ArgumentMatchers.any;
11 import static org.mockito.ArgumentMatchers.anyBoolean;
12 import static org.mockito.ArgumentMatchers.eq;
13 import static org.mockito.Mockito.doAnswer;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.doThrow;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.spy;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import static org.mockito.Mockito.verifyNoMoreInteractions;
22
23 import com.google.common.util.concurrent.FutureCallback;
24 import com.google.common.util.concurrent.Futures;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import com.google.common.util.concurrent.MoreExecutors;
27 import com.google.common.util.concurrent.SettableFuture;
28 import io.netty.buffer.Unpooled;
29 import io.netty.channel.Channel;
30 import io.netty.channel.ChannelConfig;
31 import io.netty.channel.ChannelFuture;
32 import io.netty.channel.ChannelHandlerContext;
33 import io.netty.channel.ChannelPromise;
34 import io.netty.channel.DefaultChannelPromise;
35 import io.netty.util.concurrent.EventExecutor;
36 import java.io.IOException;
37 import java.net.SocketAddress;
38 import java.util.concurrent.TimeUnit;
39 import org.junit.After;
40 import org.junit.Before;
41 import org.junit.Ignore;
42 import org.junit.Test;
43 import org.junit.runner.RunWith;
44 import org.mockito.Mock;
45 import org.mockito.junit.MockitoJUnitRunner;
46 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
47 import org.opendaylight.netconf.shaded.sshd.client.channel.ClientChannel;
48 import org.opendaylight.netconf.shaded.sshd.client.future.AuthFuture;
49 import org.opendaylight.netconf.shaded.sshd.client.future.ConnectFuture;
50 import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
51 import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
52 import org.opendaylight.netconf.shaded.sshd.common.future.CloseFuture;
53 import org.opendaylight.netconf.shaded.sshd.common.future.SshFuture;
54 import org.opendaylight.netconf.shaded.sshd.common.future.SshFutureListener;
55 import org.opendaylight.netconf.shaded.sshd.common.io.IoInputStream;
56 import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
57 import org.opendaylight.netconf.shaded.sshd.common.io.IoReadFuture;
58 import org.opendaylight.netconf.shaded.sshd.common.io.IoWriteFuture;
59 import org.opendaylight.netconf.shaded.sshd.common.io.WritePendingException;
60 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.Buffer;
61 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
62
63 @RunWith(MockitoJUnitRunner.StrictStubs.class)
64 public class AsyncSshHandlerTest {
65
66     @Mock
67     private NetconfSshClient sshClient;
68     @Mock
69     private AuthenticationHandler authHandler;
70     @Mock
71     private ChannelHandlerContext ctx;
72     @Mock
73     private Channel channel;
74     @Mock
75     private SocketAddress remoteAddress;
76     @Mock
77     private SocketAddress localAddress;
78     @Mock
79     private ChannelConfig channelConfig;
80     @Mock
81     private EventExecutor executor;
82
83     private AsyncSshHandler asyncSshHandler;
84
85     private SshFutureListener<ConnectFuture> sshConnectListener;
86     private SshFutureListener<AuthFuture> sshAuthListener;
87     private SshFutureListener<OpenFuture> sshChannelOpenListener;
88     private ChannelPromise promise;
89
90     @Before
91     public void setUp() throws Exception {
92         stubAuth();
93         stubSshClient();
94         stubChannel();
95         stubCtx();
96
97         promise = getMockedPromise();
98
99         asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
100     }
101
102     @After
103     public void tearDown() throws Exception {
104         sshConnectListener = null;
105         sshAuthListener = null;
106         sshChannelOpenListener = null;
107         promise = null;
108         asyncSshHandler.close(ctx, getMockedPromise());
109     }
110
111     private void stubAuth() throws IOException {
112         doReturn("usr").when(authHandler).getUsername();
113
114         final AuthFuture authFuture = mock(AuthFuture.class);
115         Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
116             @Override
117             public void onSuccess(final SshFutureListener<AuthFuture> result) {
118                 sshAuthListener = result;
119             }
120         }, MoreExecutors.directExecutor());
121         doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
122     }
123
124     @SuppressWarnings("unchecked")
125     private static <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
126         final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
127
128         doAnswer(invocation -> {
129             listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
130             return null;
131         }).when(future).addListener(any(SshFutureListener.class));
132
133         return listenerSettableFuture;
134     }
135
136     private void stubCtx() {
137         doReturn(channel).when(ctx).channel();
138         doReturn(ctx).when(ctx).fireChannelActive();
139         doReturn(ctx).when(ctx).fireChannelInactive();
140         doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
141         doReturn(getMockedPromise()).when(ctx).newPromise();
142         doReturn(executor).when(ctx).executor();
143         doAnswer(invocation -> {
144             invocation.getArgument(0, Runnable.class).run();
145             return null;
146         }).when(executor).execute(any());
147     }
148
149     private void stubChannel() {
150         doReturn("channel").when(channel).toString();
151     }
152
153     private void stubSshClient() throws IOException {
154         final ConnectFuture connectFuture = mock(ConnectFuture.class);
155         Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
156             @Override
157             public void onSuccess(final SshFutureListener<ConnectFuture> result) {
158                 sshConnectListener = result;
159             }
160         }, MoreExecutors.directExecutor());
161         doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
162         doReturn(channelConfig).when(channel).config();
163         doReturn(1).when(channelConfig).getConnectTimeoutMillis();
164         doReturn(connectFuture).when(connectFuture).verify(1,TimeUnit.MILLISECONDS);
165     }
166
167     @Test
168     public void testConnectSuccess() throws Exception {
169         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
170
171         final IoInputStream asyncOut = getMockedIoInputStream();
172         final IoOutputStream asyncIn = getMockedIoOutputStream();
173         final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
174         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
175         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
176
177         sshConnectListener.operationComplete(connectFuture);
178         sshAuthListener.operationComplete(getSuccessAuthFuture());
179         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
180
181         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
182
183         verify(promise).setSuccess();
184         verify(ctx).fireChannelActive();
185         asyncSshHandler.close(ctx, getMockedPromise());
186         verify(ctx).fireChannelInactive();
187     }
188
189     @Test
190     public void testWrite() throws Exception {
191         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
192
193         final IoInputStream asyncOut = getMockedIoInputStream();
194         final IoOutputStream asyncIn = getMockedIoOutputStream();
195         final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
196         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
197         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
198
199         sshConnectListener.operationComplete(connectFuture);
200         sshAuthListener.operationComplete(getSuccessAuthFuture());
201         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
202
203         final ChannelPromise writePromise = getMockedPromise();
204         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
205
206         verify(writePromise).setSuccess();
207     }
208
209     @Test
210     public void testWriteClosed() throws Exception {
211         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
212
213         final IoInputStream asyncOut = getMockedIoInputStream();
214         final IoOutputStream asyncIn = getMockedIoOutputStream();
215
216         final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
217
218         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
219             @Override
220             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
221                 doReturn(false).when(ioWriteFuture).isWritten();
222                 doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
223                 result.operationComplete(ioWriteFuture);
224             }
225         }, MoreExecutors.directExecutor());
226
227         final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
228         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
229         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
230
231         sshConnectListener.operationComplete(connectFuture);
232         sshAuthListener.operationComplete(getSuccessAuthFuture());
233         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
234
235         final ChannelPromise writePromise = getMockedPromise();
236         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
237
238         verify(writePromise).setFailure(any(Throwable.class));
239     }
240
241     @Test
242     public void testWritePendingOne() throws Exception {
243         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
244
245         final IoInputStream asyncOut = getMockedIoInputStream();
246         final IoOutputStream asyncIn = getMockedIoOutputStream();
247         final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
248
249         final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
250         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
251         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
252
253         sshConnectListener.operationComplete(connectFuture);
254         sshAuthListener.operationComplete(getSuccessAuthFuture());
255         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
256
257         final ChannelPromise firstWritePromise = getMockedPromise();
258
259         // intercept listener for first write,
260         // so we can invoke successful write later thus simulate pending of the first write
261         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
262                 stubAddListener(ioWriteFuture);
263         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
264         final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
265         // intercept second listener,
266         // this is the listener for pending write for the pending write to know when pending state ended
267         final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
268
269         final ChannelPromise secondWritePromise = getMockedPromise();
270         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
271
272         doReturn(ioWriteFuture).when(asyncIn).writeBuffer(any(Buffer.class));
273
274         verifyNoMoreInteractions(firstWritePromise, secondWritePromise);
275
276         // make first write stop pending
277         firstWriteListener.operationComplete(ioWriteFuture);
278
279         // notify listener for second write that pending has ended
280         pendingListener.get().operationComplete(ioWriteFuture);
281
282         // verify both write promises successful
283         verify(firstWritePromise).setSuccess();
284         verify(secondWritePromise).setSuccess();
285     }
286
287     @Ignore("Pending queue is not limited")
288     @Test
289     public void testWritePendingMax() throws Exception {
290         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
291
292         final IoInputStream asyncOut = getMockedIoInputStream();
293         final IoOutputStream asyncIn = getMockedIoOutputStream();
294         final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
295
296         final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
297         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
298         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
299
300         sshConnectListener.operationComplete(connectFuture);
301         sshAuthListener.operationComplete(getSuccessAuthFuture());
302         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
303
304         final ChannelPromise firstWritePromise = getMockedPromise();
305
306         // intercept listener for first write,
307         // so we can invoke successful write later thus simulate pending of the first write
308         final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
309                 stubAddListener(ioWriteFuture);
310         asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
311
312         final ChannelPromise secondWritePromise = getMockedPromise();
313         // now make write throw pending exception
314         doThrow(WritePendingException.class).when(asyncIn).writeBuffer(any(Buffer.class));
315         for (int i = 0; i < 1001; i++) {
316             asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
317         }
318
319         verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
320     }
321
322     @Test
323     public void testDisconnect() throws Exception {
324         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
325
326         final IoInputStream asyncOut = getMockedIoInputStream();
327         final IoOutputStream asyncIn = getMockedIoOutputStream();
328         final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
329         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
330         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
331
332         sshConnectListener.operationComplete(connectFuture);
333         sshAuthListener.operationComplete(getSuccessAuthFuture());
334         sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
335
336         final ChannelPromise disconnectPromise = getMockedPromise();
337         asyncSshHandler.disconnect(ctx, disconnectPromise);
338
339         verify(sshSession).close(anyBoolean());
340         verify(disconnectPromise).setSuccess();
341         //verify(ctx).fireChannelInactive();
342     }
343
344     private static OpenFuture getSuccessOpenFuture() {
345         final OpenFuture openFuture = mock(OpenFuture.class);
346         doReturn(null).when(openFuture).getException();
347         return openFuture;
348     }
349
350     private static AuthFuture getSuccessAuthFuture() {
351         final AuthFuture authFuture = mock(AuthFuture.class);
352         doReturn(null).when(authFuture).getException();
353         return authFuture;
354     }
355
356     private static ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
357         final ConnectFuture connectFuture = mock(ConnectFuture.class);
358         doReturn(null).when(connectFuture).getException();
359
360         doReturn(sshSession).when(connectFuture).getSession();
361         return connectFuture;
362     }
363
364     private static NettyAwareClientSession getMockedSshSession(final NettyAwareChannelSubsystem subsystemChannel)
365             throws IOException {
366         final NettyAwareClientSession sshSession = mock(NettyAwareClientSession.class);
367
368         doReturn("serverVersion").when(sshSession).getServerVersion();
369         doReturn(false).when(sshSession).isClosed();
370         doReturn(false).when(sshSession).isClosing();
371         final CloseFuture closeFuture = mock(CloseFuture.class);
372         Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
373             @Override
374             public void onSuccess(final SshFutureListener<CloseFuture> result) {
375                 doReturn(true).when(closeFuture).isClosed();
376                 result.operationComplete(closeFuture);
377             }
378         }, MoreExecutors.directExecutor());
379         doReturn(closeFuture).when(sshSession).close(false);
380
381         doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(eq("netconf"), any());
382
383         return sshSession;
384     }
385
386     private NettyAwareChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut,
387                                                        final IoOutputStream asyncIn) throws IOException {
388         final NettyAwareChannelSubsystem subsystemChannel = mock(NettyAwareChannelSubsystem.class);
389
390         doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
391         final OpenFuture openFuture = mock(OpenFuture.class);
392
393         Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
394             @Override
395             public void onSuccess(final SshFutureListener<OpenFuture> result) {
396                 sshChannelOpenListener = result;
397             }
398         }, MoreExecutors.directExecutor());
399
400         doReturn(openFuture).when(subsystemChannel).open();
401         doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
402         doNothing().when(subsystemChannel).onClose(any());
403         doNothing().when(subsystemChannel).close();
404         return subsystemChannel;
405     }
406
407     private static IoOutputStream getMockedIoOutputStream() throws IOException {
408         final IoOutputStream mock = mock(IoOutputStream.class);
409         final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
410         doReturn(true).when(ioWriteFuture).isWritten();
411
412         Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
413             @Override
414             public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
415                 result.operationComplete(ioWriteFuture);
416             }
417         }, MoreExecutors.directExecutor());
418
419         doReturn(ioWriteFuture).when(mock).writeBuffer(any(Buffer.class));
420         doReturn(false).when(mock).isClosed();
421         doReturn(false).when(mock).isClosing();
422         return mock;
423     }
424
425     private static IoInputStream getMockedIoInputStream() {
426         final IoInputStream mock = mock(IoInputStream.class);
427         final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
428         // Always success for read
429         Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
430             @Override
431             public void onSuccess(final SshFutureListener<IoReadFuture> result) {
432                 result.operationComplete(ioReadFuture);
433             }
434         }, MoreExecutors.directExecutor());
435         return mock;
436     }
437
438     @Test
439     public void testConnectFailOpenChannel() throws Exception {
440         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
441
442         final IoInputStream asyncOut = getMockedIoInputStream();
443         final IoOutputStream asyncIn = getMockedIoOutputStream();
444         final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
445         final ClientSession sshSession = getMockedSshSession(subsystemChannel);
446         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
447
448         sshConnectListener.operationComplete(connectFuture);
449
450         sshAuthListener.operationComplete(getSuccessAuthFuture());
451
452         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
453
454         sshChannelOpenListener.operationComplete(getFailedOpenFuture());
455         verify(promise).setFailure(any(Throwable.class));
456     }
457
458     @Test
459     public void testConnectFailAuth() throws Exception {
460         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
461
462         final NettyAwareClientSession sshSession = mock(NettyAwareClientSession.class);
463         doReturn(true).when(sshSession).isClosed();
464         final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
465
466         sshConnectListener.operationComplete(connectFuture);
467
468         final AuthFuture authFuture = getFailedAuthFuture();
469
470         sshAuthListener.operationComplete(authFuture);
471         verify(promise).setFailure(any(Throwable.class));
472         asyncSshHandler.close(ctx, getMockedPromise());
473         verify(ctx, times(0)).fireChannelInactive();
474     }
475
476     private static AuthFuture getFailedAuthFuture() {
477         final AuthFuture authFuture = mock(AuthFuture.class);
478         doReturn(new IllegalStateException()).when(authFuture).getException();
479         return authFuture;
480     }
481
482     private static OpenFuture getFailedOpenFuture() {
483         final OpenFuture openFuture = mock(OpenFuture.class);
484         doReturn(new IllegalStateException()).when(openFuture).getException();
485         return openFuture;
486     }
487
488     @Test
489     public void testConnectFail() throws Exception {
490         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
491
492         final ConnectFuture connectFuture = getFailedConnectFuture();
493         sshConnectListener.operationComplete(connectFuture);
494         verify(promise).setFailure(any(Throwable.class));
495     }
496
497     private static ConnectFuture getFailedConnectFuture() {
498         final ConnectFuture connectFuture = mock(ConnectFuture.class);
499         doReturn(new IllegalStateException()).when(connectFuture).getException();
500         return connectFuture;
501     }
502
503     private ChannelPromise getMockedPromise() {
504         return spy(new DefaultChannelPromise(channel));
505     }
506
507     private abstract static class SuccessFutureListener<T extends SshFuture<T>>
508             implements FutureCallback<SshFutureListener<T>> {
509
510         @Override
511         public abstract void onSuccess(SshFutureListener<T> result);
512
513         @Override
514         public void onFailure(final Throwable throwable) {
515             throw new RuntimeException(throwable);
516         }
517     }
518 }