Merge "BUG-2340 Fix improper cleanup of resources in netconf ssh handler"
authorTony Tkacik <ttkacik@cisco.com>
Mon, 1 Dec 2014 08:25:10 +0000 (08:25 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 1 Dec 2014 08:25:10 +0000 (08:25 +0000)
opendaylight/commons/protocol-framework/src/main/java/org/opendaylight/protocol/framework/ReconnectPromise.java
opendaylight/commons/protocol-framework/src/test/java/org/opendaylight/protocol/framework/ServerTest.java
opendaylight/md-sal/sal-netconf-connector/src/test/java/org/opendaylight/controller/sal/connect/netconf/listener/NetconfDeviceCommunicatorTest.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java
opendaylight/netconf/netconf-ssh/src/test/java/org/opendaylight/controller/netconf/netty/SSHTest.java

index 98a2c2c..b2ab27a 100644 (file)
@@ -80,7 +80,7 @@ final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionList
 
     /**
      * Channel handler that responds to channelInactive event and reconnects the session.
-     * Only if the initial connection was successfully established and promise was not canceled.
+     * Only if the promise was not canceled.
      */
     private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
         private final ReconnectPromise<?, ?> promise;
index fc38888..6c4af01 100644 (file)
@@ -250,6 +250,52 @@ public class ServerTest {
         assertFalse(session.isSuccess());
     }
 
+    @Test
+    public void testNegotiationFailedReconnect() throws Exception {
+        final Promise<Boolean> p = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+
+        this.dispatcher = getServerDispatcher(p);
+
+        this.server = this.dispatcher.createServer(this.serverAddress, new SessionListenerFactory<SimpleSessionListener>() {
+            @Override
+            public SimpleSessionListener getSessionListener() {
+                return new SimpleSessionListener();
+            }
+        });
+
+        this.server.get();
+
+        this.clientDispatcher = new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
+            @Override
+            public SessionNegotiator<SimpleSession> getSessionNegotiator(final SessionListenerFactory<SimpleSessionListener> factory,
+                                                                         final Channel channel, final Promise<SimpleSession> promise) {
+
+                return new SimpleSessionNegotiator(promise, channel) {
+                    @Override
+                    protected void startNegotiation() throws Exception {
+                        negotiationFailed(new IllegalStateException("Negotiation failed"));
+                    }
+                };
+            }
+        }, new DefaultPromise<SimpleSession>(GlobalEventExecutor.INSTANCE), eventLoopGroup);
+
+        final ReconnectStrategyFactory reconnectStrategyFactory = mock(ReconnectStrategyFactory.class);
+        final ReconnectStrategy reconnectStrategy = getMockedReconnectStrategy();
+        doReturn(reconnectStrategy).when(reconnectStrategyFactory).createReconnectStrategy();
+
+        this.clientDispatcher.createReconnectingClient(this.serverAddress,
+                reconnectStrategyFactory, new SessionListenerFactory<SimpleSessionListener>() {
+                    @Override
+                    public SimpleSessionListener getSessionListener() {
+                        return new SimpleSessionListener();
+                    }
+                });
+
+
+        // Reconnect strategy should be consulted at least twice, for initial connect and reconnect attempts after drop
+        verify(reconnectStrategyFactory, timeout((int) TimeUnit.MINUTES.toMillis(3)).atLeast(2)).createReconnectStrategy();
+    }
+
     private SimpleDispatcher getClientDispatcher() {
         return new SimpleDispatcher(new SessionNegotiatorFactory<SimpleMessage, SimpleSession, SimpleSessionListener>() {
             @Override
index 001b9a8..a24034d 100644 (file)
@@ -19,6 +19,8 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants.RPC_REPLY_KEY;
 import static org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0;
@@ -27,9 +29,15 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.GlobalEventExecutor;
 import java.io.ByteArrayInputStream;
+import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.UUID;
@@ -45,11 +53,18 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcherImpl;
 import org.opendaylight.controller.netconf.client.NetconfClientSession;
+import org.opendaylight.controller.netconf.client.conf.NetconfClientConfiguration;
+import org.opendaylight.controller.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
+import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.LoginPassword;
 import org.opendaylight.controller.sal.connect.api.RemoteDevice;
 import org.opendaylight.controller.sal.connect.api.RemoteDeviceCommunicator;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.protocol.framework.ReconnectStrategy;
+import org.opendaylight.protocol.framework.ReconnectStrategyFactory;
+import org.opendaylight.protocol.framework.TimedReconnectStrategy;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -320,6 +335,60 @@ public class NetconfDeviceCommunicatorTest {
                       errorInfo.contains( "<bad-element>bar</bad-element>" ) );
     }
 
+    /**
+     * Test whether reconnect is scheduled properly
+     */
+    @Test
+    public void testNetconfDeviceReconnectInCommunicator() throws Exception {
+        final RemoteDevice<NetconfSessionCapabilities, NetconfMessage> device = mock(RemoteDevice.class);
+
+        final TimedReconnectStrategy timedReconnectStrategy = new TimedReconnectStrategy(GlobalEventExecutor.INSTANCE, 10000, 0, 1.0, null, 100L, null);
+        final ReconnectStrategy reconnectStrategy = spy(new ReconnectStrategy() {
+            @Override
+            public int getConnectTimeout() throws Exception {
+                return timedReconnectStrategy.getConnectTimeout();
+            }
+
+            @Override
+            public Future<Void> scheduleReconnect(final Throwable cause) {
+                return timedReconnectStrategy.scheduleReconnect(cause);
+            }
+
+            @Override
+            public void reconnectSuccessful() {
+                timedReconnectStrategy.reconnectSuccessful();
+            }
+        });
+
+        final NetconfDeviceCommunicator listener = new NetconfDeviceCommunicator(new RemoteDeviceId("test"), device);
+        final EventLoopGroup group = new NioEventLoopGroup();
+        final Timer time = new HashedWheelTimer();
+        try {
+            final NetconfClientConfiguration cfg = NetconfReconnectingClientConfigurationBuilder.create()
+                    .withAddress(new InetSocketAddress("localhost", 65000))
+                    .withReconnectStrategy(reconnectStrategy)
+                    .withConnectStrategyFactory(new ReconnectStrategyFactory() {
+                        @Override
+                        public ReconnectStrategy createReconnectStrategy() {
+                            return reconnectStrategy;
+                        }
+                    })
+                    .withAuthHandler(new LoginPassword("admin", "admin"))
+                    .withConnectionTimeoutMillis(10000)
+                    .withProtocol(NetconfClientConfiguration.NetconfClientProtocol.SSH)
+                    .withSessionListener(listener)
+                    .build();
+
+
+            listener.initializeRemoteConnection(new NetconfClientDispatcherImpl(group, group, time), cfg);
+
+            verify(reconnectStrategy, timeout((int) TimeUnit.MINUTES.toMillis(3)).times(101)).scheduleReconnect(any(Throwable.class));
+        } finally {
+            time.stop();
+            group.shutdownGracefully();
+        }
+    }
+
     @Test
     public void testOnResponseMessageWithWrongMessageID() throws Exception {
         setupSession();
index 14d753f..c8c9128 100644 (file)
@@ -137,7 +137,6 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         LOG.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel());
 
         connectPromise.setSuccess();
-        connectPromise = null;
 
         // TODO we should also read from error stream and at least log from that
 
@@ -162,9 +161,12 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
     private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
         LOG.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), e);
-        connectPromise.setFailure(e);
-        connectPromise = null;
-        throw new IllegalStateException("Unable to setup SSH connection on channel: " + ctx.channel(), e);
+        disconnect(ctx, ctx.newPromise());
+
+        // If the promise is not yet done, we have failed with initial connect and set connectPromise to failure
+        if(!connectPromise.isDone()) {
+            connectPromise.setFailure(e);
+        }
     }
 
     @Override
@@ -185,6 +187,15 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
     @Override
     public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
+        // Super disconnect is necessary in this case since we are using NioSocketChannel and it needs to cleanup its resources
+        // e.g. Socket that it tries to open in its constructor (https://bugs.opendaylight.org/show_bug.cgi?id=2430)
+        // TODO better solution would be to implement custom ChannelFactory + Channel that will use mina SSH lib internally: port this to custom channel implementation
+        try {
+            super.disconnect(ctx, ctx.newPromise());
+        } catch (final Exception e) {
+            LOG.warn("Unable to cleanup all resources for channel: {}. Ignoring.", ctx.channel(), e);
+        }
+
         if(sshReadAsyncListener != null) {
             sshReadAsyncListener.close();
         }
@@ -205,11 +216,15 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             });
         }
 
+        // If we have already succeeded and the session was dropped after, we need to fire inactive to notify reconnect logic
+        if(connectPromise.isSuccess()) {
+            ctx.fireChannelInactive();
+        }
+
         channel = null;
-        promise.setSuccess();
 
+        promise.setSuccess();
         LOG.debug("SSH session closed on channel: {}", ctx.channel());
-        ctx.fireChannelInactive();
     }
 
 }
index 73f2287..dfca1b8 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client;
 
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyObject;
@@ -18,9 +17,9 @@ import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 
 import com.google.common.util.concurrent.FutureCallback;
@@ -30,8 +29,10 @@ import com.google.common.util.concurrent.SettableFuture;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
 import java.io.IOException;
 import java.net.SocketAddress;
 import org.apache.sshd.ClientChannel;
@@ -143,6 +144,7 @@ public class AsyncSshHandlerTest {
         doReturn(ctx).when(ctx).fireChannelActive();
         doReturn(ctx).when(ctx).fireChannelInactive();
         doReturn(ctx).when(ctx).fireChannelRead(anyObject());
+        doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
         doReturn(getMockedPromise()).when(ctx).newPromise();
     }
 
@@ -179,7 +181,6 @@ public class AsyncSshHandlerTest {
         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
 
         verify(promise).setSuccess();
-        verifyNoMoreInteractions(promise);
         verify(ctx).fireChannelActive();
     }
 
@@ -533,14 +534,8 @@ public class AsyncSshHandlerTest {
 
         verify(subsystemChannel).setStreaming(ClientChannel.Streaming.Async);
 
-        try {
-            sshChannelOpenListener.operationComplete(getFailedOpenFuture());
-            fail("Exception expected");
-        } catch (final Exception e) {
-            verify(promise).setFailure(any(Throwable.class));
-            verifyNoMoreInteractions(promise);
-            // TODO should ctx.channelInactive be called if we throw exception ?
-        }
+        sshChannelOpenListener.operationComplete(getFailedOpenFuture());
+        verify(promise).setFailure(any(Throwable.class));
     }
 
     @Test
@@ -555,14 +550,8 @@ public class AsyncSshHandlerTest {
 
         final AuthFuture authFuture = getFailedAuthFuture();
 
-        try {
-            sshAuthListener.operationComplete(authFuture);
-            fail("Exception expected");
-        } catch (final Exception e) {
-            verify(promise).setFailure(any(Throwable.class));
-            verifyNoMoreInteractions(promise);
-            // TODO should ctx.channelInactive be called ?
-        }
+        sshAuthListener.operationComplete(authFuture);
+        verify(promise).setFailure(any(Throwable.class));
     }
 
     private AuthFuture getFailedAuthFuture() {
@@ -584,14 +573,8 @@ public class AsyncSshHandlerTest {
         asyncSshHandler.connect(ctx, remoteAddress, localAddress, promise);
 
         final ConnectFuture connectFuture = getFailedConnectFuture();
-        try {
-            sshConnectListener.operationComplete(connectFuture);
-            fail("Exception expected");
-        } catch (final Exception e) {
-            verify(promise).setFailure(any(Throwable.class));
-            verifyNoMoreInteractions(promise);
-            // TODO should ctx.channelInactive be called ?
-        }
+        sshConnectListener.operationComplete(connectFuture);
+        verify(promise).setFailure(any(Throwable.class));
     }
 
     private ConnectFuture getFailedConnectFuture() {
@@ -602,10 +585,7 @@ public class AsyncSshHandlerTest {
     }
 
     private ChannelPromise getMockedPromise() {
-        final ChannelPromise promise = mock(ChannelPromise.class);
-        doReturn(promise).when(promise).setSuccess();
-        doReturn(promise).when(promise).setFailure(any(Throwable.class));
-        return promise;
+        return spy(new DefaultChannelPromise(channel));
     }
 
     private static abstract class SuccessFutureListener<T extends SshFuture<T>> implements FutureCallback<SshFutureListener<T>> {
index 2802488..72534e2 100644 (file)
@@ -136,7 +136,7 @@ public class SSHTest {
             Thread.sleep(100);
         }
         assertFalse(echoClientHandler.isConnected());
-        assertEquals(State.CONNECTION_CLOSED, echoClientHandler.getState());
+        assertEquals(State.FAILED_TO_CONNECT, echoClientHandler.getState());
     }
 
 }