2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
10 import static org.mockito.ArgumentMatchers.any;
11 import static org.mockito.ArgumentMatchers.anyBoolean;
12 import static org.mockito.ArgumentMatchers.eq;
13 import static org.mockito.Mockito.doAnswer;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.doThrow;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.spy;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import static org.mockito.Mockito.verifyNoMoreInteractions;
23 import com.google.common.util.concurrent.FutureCallback;
24 import com.google.common.util.concurrent.Futures;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import com.google.common.util.concurrent.MoreExecutors;
27 import com.google.common.util.concurrent.SettableFuture;
28 import io.netty.buffer.Unpooled;
29 import io.netty.channel.Channel;
30 import io.netty.channel.ChannelConfig;
31 import io.netty.channel.ChannelFuture;
32 import io.netty.channel.ChannelHandlerContext;
33 import io.netty.channel.ChannelPromise;
34 import io.netty.channel.DefaultChannelPromise;
35 import io.netty.util.concurrent.EventExecutor;
36 import java.io.IOException;
37 import java.net.SocketAddress;
38 import java.util.concurrent.TimeUnit;
39 import org.junit.After;
40 import org.junit.Before;
41 import org.junit.Ignore;
42 import org.junit.Test;
43 import org.junit.runner.RunWith;
44 import org.mockito.Mock;
45 import org.mockito.junit.MockitoJUnitRunner;
46 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
47 import org.opendaylight.netconf.shaded.sshd.client.channel.ClientChannel;
48 import org.opendaylight.netconf.shaded.sshd.client.future.AuthFuture;
49 import org.opendaylight.netconf.shaded.sshd.client.future.ConnectFuture;
50 import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
51 import org.opendaylight.netconf.shaded.sshd.client.session.ClientSession;
52 import org.opendaylight.netconf.shaded.sshd.common.future.CloseFuture;
53 import org.opendaylight.netconf.shaded.sshd.common.future.SshFuture;
54 import org.opendaylight.netconf.shaded.sshd.common.future.SshFutureListener;
55 import org.opendaylight.netconf.shaded.sshd.common.io.IoInputStream;
56 import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
57 import org.opendaylight.netconf.shaded.sshd.common.io.IoReadFuture;
58 import org.opendaylight.netconf.shaded.sshd.common.io.IoWriteFuture;
59 import org.opendaylight.netconf.shaded.sshd.common.io.WritePendingException;
60 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.Buffer;
61 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
63 @RunWith(MockitoJUnitRunner.StrictStubs.class)
64 public class AsyncSshHandlerTest {
67 private NetconfSshClient sshClient;
69 private AuthenticationHandler authHandler;
71 private ChannelHandlerContext ctx;
73 private Channel channel;
75 private SocketAddress remoteAddress;
77 private SocketAddress localAddress;
79 private ChannelConfig channelConfig;
81 private EventExecutor executor;
83 private AsyncSshHandler asyncSshHandler;
85 private SshFutureListener<ConnectFuture> sshConnectListener;
86 private SshFutureListener<AuthFuture> sshAuthListener;
87 private SshFutureListener<OpenFuture> sshChannelOpenListener;
88 private ChannelPromise promise;
91 public void setUp() throws Exception {
97 promise = getMockedPromise();
99 asyncSshHandler = new AsyncSshHandler(authHandler, sshClient);
103 public void tearDown() throws Exception {
104 sshConnectListener = null;
105 sshAuthListener = null;
106 sshChannelOpenListener = null;
108 asyncSshHandler.close(ctx, getMockedPromise());
111 private void stubAuth() throws IOException {
112 doReturn("usr").when(authHandler).getUsername();
114 final AuthFuture authFuture = mock(AuthFuture.class);
115 Futures.addCallback(stubAddListener(authFuture), new SuccessFutureListener<AuthFuture>() {
117 public void onSuccess(final SshFutureListener<AuthFuture> result) {
118 sshAuthListener = result;
120 }, MoreExecutors.directExecutor());
121 doReturn(authFuture).when(authHandler).authenticate(any(ClientSession.class));
124 @SuppressWarnings("unchecked")
125 private static <T extends SshFuture<T>> ListenableFuture<SshFutureListener<T>> stubAddListener(final T future) {
126 final SettableFuture<SshFutureListener<T>> listenerSettableFuture = SettableFuture.create();
128 doAnswer(invocation -> {
129 listenerSettableFuture.set((SshFutureListener<T>) invocation.getArguments()[0]);
131 }).when(future).addListener(any(SshFutureListener.class));
133 return listenerSettableFuture;
136 private void stubCtx() {
137 doReturn(channel).when(ctx).channel();
138 doReturn(ctx).when(ctx).fireChannelActive();
139 doReturn(ctx).when(ctx).fireChannelInactive();
140 doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
141 doReturn(getMockedPromise()).when(ctx).newPromise();
142 doReturn(executor).when(ctx).executor();
143 doAnswer(invocation -> {
144 invocation.getArgument(0, Runnable.class).run();
146 }).when(executor).execute(any());
149 private void stubChannel() {
150 doReturn("channel").when(channel).toString();
153 private void stubSshClient() throws IOException {
154 final ConnectFuture connectFuture = mock(ConnectFuture.class);
155 Futures.addCallback(stubAddListener(connectFuture), new SuccessFutureListener<ConnectFuture>() {
157 public void onSuccess(final SshFutureListener<ConnectFuture> result) {
158 sshConnectListener = result;
160 }, MoreExecutors.directExecutor());
161 doReturn(connectFuture).when(sshClient).connect("usr", remoteAddress);
162 doReturn(channelConfig).when(channel).config();
163 doReturn(1).when(channelConfig).getConnectTimeoutMillis();
164 doReturn(connectFuture).when(connectFuture).verify(1,TimeUnit.MILLISECONDS);
168 public void testConnectSuccess() throws Exception {
169 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
171 final IoInputStream asyncOut = getMockedIoInputStream();
172 final IoOutputStream asyncIn = getMockedIoOutputStream();
173 final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
174 final ClientSession sshSession = getMockedSshSession(subsystemChannel);
175 final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
177 sshConnectListener.operationComplete(connectFuture);
178 sshAuthListener.operationComplete(getSuccessAuthFuture());
179 sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
181 verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
183 verify(promise).setSuccess();
184 verify(ctx).fireChannelActive();
185 asyncSshHandler.close(ctx, getMockedPromise());
186 verify(ctx).fireChannelInactive();
190 public void testWrite() throws Exception {
191 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
193 final IoInputStream asyncOut = getMockedIoInputStream();
194 final IoOutputStream asyncIn = getMockedIoOutputStream();
195 final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
196 final ClientSession sshSession = getMockedSshSession(subsystemChannel);
197 final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
199 sshConnectListener.operationComplete(connectFuture);
200 sshAuthListener.operationComplete(getSuccessAuthFuture());
201 sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
203 final ChannelPromise writePromise = getMockedPromise();
204 asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), writePromise);
206 verify(writePromise).setSuccess();
210 public void testWriteClosed() throws Exception {
211 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
213 final IoInputStream asyncOut = getMockedIoInputStream();
214 final IoOutputStream asyncIn = getMockedIoOutputStream();
216 final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
218 Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
220 public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
221 doReturn(false).when(ioWriteFuture).isWritten();
222 doReturn(new IllegalStateException()).when(ioWriteFuture).getException();
223 result.operationComplete(ioWriteFuture);
225 }, MoreExecutors.directExecutor());
227 final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
228 final ClientSession sshSession = getMockedSshSession(subsystemChannel);
229 final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
231 sshConnectListener.operationComplete(connectFuture);
232 sshAuthListener.operationComplete(getSuccessAuthFuture());
233 sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
235 final ChannelPromise writePromise = getMockedPromise();
236 asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), writePromise);
238 verify(writePromise).setFailure(any(Throwable.class));
242 public void testWritePendingOne() throws Exception {
243 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
245 final IoInputStream asyncOut = getMockedIoInputStream();
246 final IoOutputStream asyncIn = getMockedIoOutputStream();
247 final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
249 final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
250 final ClientSession sshSession = getMockedSshSession(subsystemChannel);
251 final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
253 sshConnectListener.operationComplete(connectFuture);
254 sshAuthListener.operationComplete(getSuccessAuthFuture());
255 sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
257 final ChannelPromise firstWritePromise = getMockedPromise();
259 // intercept listener for first write,
260 // so we can invoke successful write later thus simulate pending of the first write
261 final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
262 stubAddListener(ioWriteFuture);
263 asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
264 final SshFutureListener<IoWriteFuture> firstWriteListener = firstWriteListenerFuture.get();
265 // intercept second listener,
266 // this is the listener for pending write for the pending write to know when pending state ended
267 final ListenableFuture<SshFutureListener<IoWriteFuture>> pendingListener = stubAddListener(ioWriteFuture);
269 final ChannelPromise secondWritePromise = getMockedPromise();
270 asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
272 doReturn(ioWriteFuture).when(asyncIn).writeBuffer(any(Buffer.class));
274 verifyNoMoreInteractions(firstWritePromise, secondWritePromise);
276 // make first write stop pending
277 firstWriteListener.operationComplete(ioWriteFuture);
279 // notify listener for second write that pending has ended
280 pendingListener.get().operationComplete(ioWriteFuture);
282 // verify both write promises successful
283 verify(firstWritePromise).setSuccess();
284 verify(secondWritePromise).setSuccess();
287 @Ignore("Pending queue is not limited")
289 public void testWritePendingMax() throws Exception {
290 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
292 final IoInputStream asyncOut = getMockedIoInputStream();
293 final IoOutputStream asyncIn = getMockedIoOutputStream();
294 final IoWriteFuture ioWriteFuture = asyncIn.writeBuffer(new ByteArrayBuffer());
296 final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
297 final ClientSession sshSession = getMockedSshSession(subsystemChannel);
298 final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
300 sshConnectListener.operationComplete(connectFuture);
301 sshAuthListener.operationComplete(getSuccessAuthFuture());
302 sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
304 final ChannelPromise firstWritePromise = getMockedPromise();
306 // intercept listener for first write,
307 // so we can invoke successful write later thus simulate pending of the first write
308 final ListenableFuture<SshFutureListener<IoWriteFuture>> firstWriteListenerFuture =
309 stubAddListener(ioWriteFuture);
310 asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0,1,2,3,4,5}), firstWritePromise);
312 final ChannelPromise secondWritePromise = getMockedPromise();
313 // now make write throw pending exception
314 doThrow(WritePendingException.class).when(asyncIn).writeBuffer(any(Buffer.class));
315 for (int i = 0; i < 1001; i++) {
316 asyncSshHandler.write(ctx, Unpooled.copiedBuffer(new byte[]{0, 1, 2, 3, 4, 5}), secondWritePromise);
319 verify(secondWritePromise, times(1)).setFailure(any(Throwable.class));
323 public void testDisconnect() throws Exception {
324 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
326 final IoInputStream asyncOut = getMockedIoInputStream();
327 final IoOutputStream asyncIn = getMockedIoOutputStream();
328 final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
329 final ClientSession sshSession = getMockedSshSession(subsystemChannel);
330 final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
332 sshConnectListener.operationComplete(connectFuture);
333 sshAuthListener.operationComplete(getSuccessAuthFuture());
334 sshChannelOpenListener.operationComplete(getSuccessOpenFuture());
336 final ChannelPromise disconnectPromise = getMockedPromise();
337 asyncSshHandler.disconnect(ctx, disconnectPromise);
339 verify(sshSession).close(anyBoolean());
340 verify(disconnectPromise).setSuccess();
341 //verify(ctx).fireChannelInactive();
344 private static OpenFuture getSuccessOpenFuture() {
345 final OpenFuture openFuture = mock(OpenFuture.class);
346 doReturn(null).when(openFuture).getException();
350 private static AuthFuture getSuccessAuthFuture() {
351 final AuthFuture authFuture = mock(AuthFuture.class);
352 doReturn(null).when(authFuture).getException();
356 private static ConnectFuture getSuccessConnectFuture(final ClientSession sshSession) {
357 final ConnectFuture connectFuture = mock(ConnectFuture.class);
358 doReturn(null).when(connectFuture).getException();
360 doReturn(sshSession).when(connectFuture).getSession();
361 return connectFuture;
364 private static NettyAwareClientSession getMockedSshSession(final NettyAwareChannelSubsystem subsystemChannel)
366 final NettyAwareClientSession sshSession = mock(NettyAwareClientSession.class);
368 doReturn("serverVersion").when(sshSession).getServerVersion();
369 doReturn(false).when(sshSession).isClosed();
370 doReturn(false).when(sshSession).isClosing();
371 final CloseFuture closeFuture = mock(CloseFuture.class);
372 Futures.addCallback(stubAddListener(closeFuture), new SuccessFutureListener<CloseFuture>() {
374 public void onSuccess(final SshFutureListener<CloseFuture> result) {
375 doReturn(true).when(closeFuture).isClosed();
376 result.operationComplete(closeFuture);
378 }, MoreExecutors.directExecutor());
379 doReturn(closeFuture).when(sshSession).close(false);
381 doReturn(subsystemChannel).when(sshSession).createSubsystemChannel(eq("netconf"), any());
386 private NettyAwareChannelSubsystem getMockedSubsystemChannel(final IoInputStream asyncOut,
387 final IoOutputStream asyncIn) throws IOException {
388 final NettyAwareChannelSubsystem subsystemChannel = mock(NettyAwareChannelSubsystem.class);
390 doNothing().when(subsystemChannel).setStreaming(any(ClientChannel.Streaming.class));
391 final OpenFuture openFuture = mock(OpenFuture.class);
393 Futures.addCallback(stubAddListener(openFuture), new SuccessFutureListener<OpenFuture>() {
395 public void onSuccess(final SshFutureListener<OpenFuture> result) {
396 sshChannelOpenListener = result;
398 }, MoreExecutors.directExecutor());
400 doReturn(openFuture).when(subsystemChannel).open();
401 doReturn(asyncIn).when(subsystemChannel).getAsyncIn();
402 doNothing().when(subsystemChannel).onClose(any());
403 doNothing().when(subsystemChannel).close();
404 return subsystemChannel;
407 private static IoOutputStream getMockedIoOutputStream() throws IOException {
408 final IoOutputStream mock = mock(IoOutputStream.class);
409 final IoWriteFuture ioWriteFuture = mock(IoWriteFuture.class);
410 doReturn(true).when(ioWriteFuture).isWritten();
412 Futures.addCallback(stubAddListener(ioWriteFuture), new SuccessFutureListener<IoWriteFuture>() {
414 public void onSuccess(final SshFutureListener<IoWriteFuture> result) {
415 result.operationComplete(ioWriteFuture);
417 }, MoreExecutors.directExecutor());
419 doReturn(ioWriteFuture).when(mock).writeBuffer(any(Buffer.class));
420 doReturn(false).when(mock).isClosed();
421 doReturn(false).when(mock).isClosing();
425 private static IoInputStream getMockedIoInputStream() {
426 final IoInputStream mock = mock(IoInputStream.class);
427 final IoReadFuture ioReadFuture = mock(IoReadFuture.class);
428 // Always success for read
429 Futures.addCallback(stubAddListener(ioReadFuture), new SuccessFutureListener<IoReadFuture>() {
431 public void onSuccess(final SshFutureListener<IoReadFuture> result) {
432 result.operationComplete(ioReadFuture);
434 }, MoreExecutors.directExecutor());
439 public void testConnectFailOpenChannel() throws Exception {
440 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
442 final IoInputStream asyncOut = getMockedIoInputStream();
443 final IoOutputStream asyncIn = getMockedIoOutputStream();
444 final NettyAwareChannelSubsystem subsystemChannel = getMockedSubsystemChannel(asyncOut, asyncIn);
445 final ClientSession sshSession = getMockedSshSession(subsystemChannel);
446 final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
448 sshConnectListener.operationComplete(connectFuture);
450 sshAuthListener.operationComplete(getSuccessAuthFuture());
452 verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
454 sshChannelOpenListener.operationComplete(getFailedOpenFuture());
455 verify(promise).setFailure(any(Throwable.class));
459 public void testConnectFailAuth() throws Exception {
460 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
462 final NettyAwareClientSession sshSession = mock(NettyAwareClientSession.class);
463 doReturn(true).when(sshSession).isClosed();
464 final ConnectFuture connectFuture = getSuccessConnectFuture(sshSession);
466 sshConnectListener.operationComplete(connectFuture);
468 final AuthFuture authFuture = getFailedAuthFuture();
470 sshAuthListener.operationComplete(authFuture);
471 verify(promise).setFailure(any(Throwable.class));
472 asyncSshHandler.close(ctx, getMockedPromise());
473 verify(ctx, times(0)).fireChannelInactive();
476 private static AuthFuture getFailedAuthFuture() {
477 final AuthFuture authFuture = mock(AuthFuture.class);
478 doReturn(new IllegalStateException()).when(authFuture).getException();
482 private static OpenFuture getFailedOpenFuture() {
483 final OpenFuture openFuture = mock(OpenFuture.class);
484 doReturn(new IllegalStateException()).when(openFuture).getException();
489 public void testConnectFail() throws Exception {
490 asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
492 final ConnectFuture connectFuture = getFailedConnectFuture();
493 sshConnectListener.operationComplete(connectFuture);
494 verify(promise).setFailure(any(Throwable.class));
497 private static ConnectFuture getFailedConnectFuture() {
498 final ConnectFuture connectFuture = mock(ConnectFuture.class);
499 doReturn(new IllegalStateException()).when(connectFuture).getException();
500 return connectFuture;
503 private ChannelPromise getMockedPromise() {
504 return spy(new DefaultChannelPromise(channel));
507 private abstract static class SuccessFutureListener<T extends SshFuture<T>>
508 implements FutureCallback<SshFutureListener<T>> {
511 public abstract void onSuccess(SshFutureListener<T> result);
514 public void onFailure(final Throwable throwable) {
515 throw new RuntimeException(throwable);