2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
10 import static org.mockito.ArgumentMatchers.any;
11 import static org.mockito.ArgumentMatchers.anyBoolean;
12 import static org.mockito.ArgumentMatchers.eq;
13 import static org.mockito.Mockito.doAnswer;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.doThrow;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.spy;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import static org.mockito.Mockito.verifyNoMoreInteractions;
23 import com.google.common.util.concurrent.FutureCallback;
24 import com.google.common.util.concurrent.Futures;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import com.google.common.util.concurrent.MoreExecutors;
27 import com.google.common.util.concurrent.SettableFuture;
28 import io.netty.buffer.Unpooled;
29 import io.netty.channel.Channel;
30 import io.netty.channel.ChannelConfig;
31 import io.netty.channel.ChannelFuture;
32 import io.netty.channel.ChannelHandlerContext;
33 import io.netty.channel.ChannelPromise;
34 import io.netty.channel.DefaultChannelPromise;
35 import io.netty.util.concurrent.EventExecutor;
36 import java.io.IOException;
37 import java.net.SocketAddress;
38 import java.util.concurrent.TimeUnit;
39 import org.junit.After;
40 import org.junit.Before;
41 import org.junit.Ignore;
42 import org.junit.Test;
43 import org.junit.runner.RunWith;
44 import org.mockito.Mock;
45 import org.mockito.junit.MockitoJUnitRunner;
46 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
47 import org.opendaylight.netconf.shaded.sshd.client.channel.ClientChannel;
48 import org.opendaylight.netconf.shaded.sshd.client.future.AuthFuture;
49 import org.opendaylight.netconf.shaded.sshd.client.future.ConnectFuture;
50 import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
51 import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
52 import org.opendaylight.netconf.shaded.sshd.common.future.CloseFuture;
53 import org.opendaylight.netconf.shaded.sshd.common.future.SshFuture;
54 import org.opendaylight.netconf.shaded.sshd.common.future.SshFutureListener;
55 import org.opendaylight.netconf.shaded.sshd.common.io.IoInputStream;
56 import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
57 import org.opendaylight.netconf.shaded.sshd.common.io.IoReadFuture;
58 import org.opendaylight.netconf.shaded.sshd.common.io.IoWriteFuture;
59 import org.opendaylight.netconf.shaded.sshd.common.io.WritePendingException;
60 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.Buffer;
61 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
63 @RunWith(MockitoJUnitRunner.StrictStubs.class)
64 public class AsyncSshHandlerTest {
67 private NetconfSshClient sshClient;
69 private AuthenticationHandler authHandler;
71 private ChannelHandlerContext ctx;
73 private Channel channel;
75 private SocketAddress remoteAddress;
77 private SocketAddress localAddress;
79 private ChannelConfig channelConfig;
81 private EventExecutor executor;
83 private AsyncSshHandler asyncSshHandler;
85 private SshFutureListener<ConnectFuture> sshConnectListener;
86 private SshFutureListener<AuthFuture> sshAuthListener;
87 private SshFutureListener<OpenFuture> sshChannelOpenListener;
88 private ChannelPromise promise;
91 public void setUp() throws Exception {
97 promise = getMockedPromise();
99 asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
103 public void tearDown() throws Exception {
104 sshConnectListener = null;
105 sshAuthListener = null;
106 sshChannelOpenListener = null;
108 asyncSshHandler.close(ctx, getMockedPromise());
111 private void stubAuth() throws IOException {
112 doReturn("usr").when(authHandler).getUsername();
114 final AuthFuture authFuture = mock(AuthFuture.class);
115 Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
117 public void onSuccess(final SshFutureListener<AuthFuture> result) {
118 sshAuthListener = result;
120 }, MoreExecutors.directExecutor());
121 doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
124 @SuppressWarnings("unchecked")
125 private static <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
126 final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
128 doAnswer(invocation -> {
129 listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
131 }).when(future).addListener(any(SshFutureListener.class));
133 return listenerSettableFuture;
136 private void stubCtx() {
137 doReturn(channel).when(ctx).channel();
138 doReturn(ctx).when(ctx).fireChannelActive();
139 doReturn(ctx).when(ctx).fireChannelInactive();
140 doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
141 doReturn(getMockedPromise()).when(ctx).newPromise();
142 doReturn(executor).when(ctx).executor();
143 doAnswer(invocation -> {
144 invocation.getArgument(0, Runnable.class).run();
146 }).when(executor).execute(any());
149 private void stubChannel() {
150 doReturn("channel").when(channel).toString();
153 private void stubSshClient() throws IOException {
154 final ConnectFuture connectFuture = mock(ConnectFuture.class);
155 Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
157 public void onSuccess(final SshFutureListener<ConnectFuture> result) {
158 sshConnectListener = result;
160 }, MoreExecutors.directExecutor());
161 doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
162 doReturn(channelConfig).when(channel).config();
163 doReturn(1).when(channelConfig).getConnectTimeoutMillis();
164 doReturn(connectFuture).when(connectFuture).verify(1,TimeUnit.MILLISECONDS);
168 public void testConnectSuccess() throws Exception {
169 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
171 final IoInputStream asyncOut = getMockedIoInputStream();
172 final IoOutputStream asyncIn = getMockedIoOutputStream();
173 final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
174 final ClientSession sshSession = getMockedSshSession(subsystemChannel);
175 final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
177 sshConnectListener.operationComplete(connectFuture);
178 sshAuthListener.operationComplete(getSuccessAuthFuture());
179 sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
181 verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
183 verify(promise).setSuccess();
184 verify(ctx).fireChannelActive();
185 asyncSshHandler.close(ctx, getMockedPromise());
186 verify(ctx).fireChannelInactive();
190 public void testWrite() throws Exception {
191 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
193 final IoInputStream asyncOut = getMockedIoInputStream();
194 final IoOutputStream asyncIn = getMockedIoOutputStream();
195 final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
196 final ClientSession sshSession = getMockedSshSession(subsystemChannel);
197 final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
199 sshConnectListener.operationComplete(connectFuture);
200 sshAuthListener.operationComplete(getSuccessAuthFuture());
201 sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
203 final ChannelPromise writePromise = getMockedPromise();
204 asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
206 verify(writePromise).setSuccess();
210 public void testWriteClosed() throws Exception {
211 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
213 final IoInputStream asyncOut = getMockedIoInputStream();
214 final IoOutputStream asyncIn = getMockedIoOutputStream();
216 final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
218 Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
220 public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
221 doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
222 result.operationComplete(ioWriteFuture);
224 }, MoreExecutors.directExecutor());
226 final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
227 final ClientSession sshSession = getMockedSshSession(subsystemChannel);
228 final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
230 sshConnectListener.operationComplete(connectFuture);
231 sshAuthListener.operationComplete(getSuccessAuthFuture());
232 sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
234 final ChannelPromise writePromise = getMockedPromise();
235 asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
237 verify(writePromise).setFailure(any(Throwable.class));
241 public void testWritePendingOne() throws Exception {
242 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
244 final IoInputStream asyncOut = getMockedIoInputStream();
245 final IoOutputStream asyncIn = getMockedIoOutputStream();
246 final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
248 final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
249 final ClientSession sshSession = getMockedSshSession(subsystemChannel);
250 final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
252 sshConnectListener.operationComplete(connectFuture);
253 sshAuthListener.operationComplete(getSuccessAuthFuture());
254 sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
256 final ChannelPromise firstWritePromise = getMockedPromise();
258 // intercept listener for first write,
259 // so we can invoke successful write later thus simulate pending of the first write
260 final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
261 stubAddListener(ioWriteFuture);
262 asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
263 final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
264 // intercept second listener,
265 // this is the listener for pending write for the pending write to know when pending state ended
266 final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
268 final ChannelPromise secondWritePromise = getMockedPromise();
269 asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
271 doReturn(ioWriteFuture).when(asyncIn).writeBuffer(any(Buffer.class));
273 verifyNoMoreInteractions(firstWritePromise, secondWritePromise);
275 // make first write stop pending
276 firstWriteListener.operationComplete(ioWriteFuture);
278 // notify listener for second write that pending has ended
279 pendingListener.get().operationComplete(ioWriteFuture);
281 // verify both write promises successful
282 verify(firstWritePromise).setSuccess();
283 verify(secondWritePromise).setSuccess();
286 @Ignore("Pending queue is not limited")
288 public void testWritePendingMax() throws Exception {
289 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
291 final IoInputStream asyncOut = getMockedIoInputStream();
292 final IoOutputStream asyncIn = getMockedIoOutputStream();
293 final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
295 final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
296 final ClientSession sshSession = getMockedSshSession(subsystemChannel);
297 final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
299 sshConnectListener.operationComplete(connectFuture);
300 sshAuthListener.operationComplete(getSuccessAuthFuture());
301 sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
303 final ChannelPromise firstWritePromise = getMockedPromise();
305 // intercept listener for first write,
306 // so we can invoke successful write later thus simulate pending of the first write
307 final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
308 stubAddListener(ioWriteFuture);
309 asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
311 final ChannelPromise secondWritePromise = getMockedPromise();
312 // now make write throw pending exception
313 doThrow(WritePendingException.class).when(asyncIn).writeBuffer(any(Buffer.class));
314 for (int i = 0; i < 1001; i++) {
315 asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
318 verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
322 public void testDisconnect() throws Exception {
323 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
325 final IoInputStream asyncOut = getMockedIoInputStream();
326 final IoOutputStream asyncIn = getMockedIoOutputStream();
327 final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
328 final ClientSession sshSession = getMockedSshSession(subsystemChannel);
329 final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
331 sshConnectListener.operationComplete(connectFuture);
332 sshAuthListener.operationComplete(getSuccessAuthFuture());
333 sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
335 final ChannelPromise disconnectPromise = getMockedPromise();
336 asyncSshHandler.disconnect(ctx, disconnectPromise);
338 verify(sshSession).close(anyBoolean());
339 verify(disconnectPromise).setSuccess();
340 //verify(ctx).fireChannelInactive();
343 private static OpenFuture getSuccessOpenFuture() {
344 final OpenFuture openFuture = mock(OpenFuture.class);
345 doReturn(null).when(openFuture).getException();
349 private static AuthFuture getSuccessAuthFuture() {
350 final AuthFuture authFuture = mock(AuthFuture.class);
351 doReturn(null).when(authFuture).getException();
355 private static ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
356 final ConnectFuture connectFuture = mock(ConnectFuture.class);
357 doReturn(null).when(connectFuture).getException();
359 doReturn(sshSession).when(connectFuture).getSession();
360 return connectFuture;
363 private static NettyAwareClientSession getMockedSshSession(final NettyAwareChannelSubsystem subsystemChannel)
365 final NettyAwareClientSession sshSession = mock(NettyAwareClientSession.class);
367 doReturn("serverVersion").when(sshSession).getServerVersion();
368 doReturn(false).when(sshSession).isClosed();
369 doReturn(false).when(sshSession).isClosing();
370 final CloseFuture closeFuture = mock(CloseFuture.class);
371 Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
373 public void onSuccess(final SshFutureListener<CloseFuture> result) {
374 doReturn(true).when(closeFuture).isClosed();
375 result.operationComplete(closeFuture);
377 }, MoreExecutors.directExecutor());
378 doReturn(closeFuture).when(sshSession).close(false);
380 doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(eq("netconf"),
381 any(ChannelHandlerContext.class));
386 private NettyAwareChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut,
387 final IoOutputStream asyncIn) throws IOException {
388 final NettyAwareChannelSubsystem subsystemChannel = mock(NettyAwareChannelSubsystem.class);
390 doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
391 final OpenFuture openFuture = mock(OpenFuture.class);
393 Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
395 public void onSuccess(final SshFutureListener<OpenFuture> result) {
396 sshChannelOpenListener = result;
398 }, MoreExecutors.directExecutor());
400 doReturn(openFuture).when(subsystemChannel).open();
401 doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
402 doNothing().when(subsystemChannel).onClose(any());
403 doNothing().when(subsystemChannel).close();
404 return subsystemChannel;
407 private static IoOutputStream getMockedIoOutputStream() throws IOException {
408 final IoOutputStream mock = mock(IoOutputStream.class);
409 final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
410 doReturn(null).when(ioWriteFuture).getException();
412 Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
414 public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
415 result.operationComplete(ioWriteFuture);
417 }, MoreExecutors.directExecutor());
419 doReturn(ioWriteFuture).when(mock).writeBuffer(any(Buffer.class));
420 doReturn(false).when(mock).isClosed();
421 doReturn(false).when(mock).isClosing();
425 private static IoInputStream getMockedIoInputStream() {
426 final IoInputStream mock = mock(IoInputStream.class);
427 final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
428 // Always success for read
429 Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
431 public void onSuccess(final SshFutureListener<IoReadFuture> result) {
432 result.operationComplete(ioReadFuture);
434 }, MoreExecutors.directExecutor());
439 public void testConnectFailOpenChannel() throws Exception {
440 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
442 final IoInputStream asyncOut = getMockedIoInputStream();
443 final IoOutputStream asyncIn = getMockedIoOutputStream();
444 final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
445 final ClientSession sshSession = getMockedSshSession(subsystemChannel);
446 final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
448 sshConnectListener.operationComplete(connectFuture);
450 sshAuthListener.operationComplete(getSuccessAuthFuture());
452 verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
454 sshChannelOpenListener.operationComplete(getFailedOpenFuture());
455 verify(promise).setFailure(any(Throwable.class));
459 public void testConnectFailAuth() throws Exception {
460 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
462 final NettyAwareClientSession sshSession = mock(NettyAwareClientSession.class);
463 doReturn(true).when(sshSession).isClosed();
464 final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
466 sshConnectListener.operationComplete(connectFuture);
468 final AuthFuture authFuture = getFailedAuthFuture();
470 sshAuthListener.operationComplete(authFuture);
471 verify(promise).setFailure(any(Throwable.class));
472 asyncSshHandler.close(ctx, getMockedPromise());
473 verify(ctx, times(0)).fireChannelInactive();
476 private static AuthFuture getFailedAuthFuture() {
477 final AuthFuture authFuture = mock(AuthFuture.class);
478 doReturn(new IllegalStateException()).when(authFuture).getException();
482 private static OpenFuture getFailedOpenFuture() {
483 final OpenFuture openFuture = mock(OpenFuture.class);
484 doReturn(new IllegalStateException()).when(openFuture).getException();
489 public void testConnectFail() throws Exception {
490 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
492 final ConnectFuture connectFuture = getFailedConnectFuture();
493 sshConnectListener.operationComplete(connectFuture);
494 verify(promise).setFailure(any(Throwable.class));
497 private static ConnectFuture getFailedConnectFuture() {
498 final ConnectFuture connectFuture = mock(ConnectFuture.class);
499 doReturn(new IllegalStateException()).when(connectFuture).getException();
500 return connectFuture;
503 private ChannelPromise getMockedPromise() {
504 return spy(new DefaultChannelPromise(channel));
507 private abstract static class SuccessFutureListener<T extends SshFuture<T>>
508 implements FutureCallback<SshFutureListener<T>> {
511 public abstract void onSuccess(SshFutureListener<T> result);
514 public void onFailure(final Throwable throwable) {
515 throw new RuntimeException(throwable);