Merge "Bug 2806 - Immediate and infinite reconnect attempts during negotiation" into...
authorMaros Marsalek <mmarsale@cisco.com>
Thu, 11 Feb 2016 18:49:50 +0000 (18:49 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 11 Feb 2016 18:49:50 +0000 (18:49 +0000)
opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/netconf/client/SshClientChannelInitializer.java
opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/netconf/client/TcpClientChannelInitializer.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerReader.java

index 9060aaa78de7c5519cf6834455f84964f06d1243..fd335304c0f1cee5d1b3345d0488c19cc8db49f6 100644 (file)
@@ -33,7 +33,7 @@ final class SshClientChannelInitializer extends AbstractChannelInitializer<Netco
     public void initialize(final Channel ch, final Promise<NetconfClientSession> promise) {
         try {
             // ssh handler has to be the first handler in pipeline
-            ch.pipeline().addFirst(AsyncSshHandler.createForNetconfSubsystem(authenticationHandler));
+            ch.pipeline().addFirst(AsyncSshHandler.createForNetconfSubsystem(authenticationHandler, promise));
             super.initialize(ch,promise);
         } catch (final IOException e) {
             throw new RuntimeException(e);
index ba258dca7055a006ff36f9e9f06fc9ad5fba0c82..ca685445619ed4f17cf7210c07824eb80a4860c8 100644 (file)
@@ -8,7 +8,14 @@
 package org.opendaylight.netconf.client;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.Promise;
+import java.net.SocketAddress;
 import org.opendaylight.netconf.nettyutil.AbstractChannelInitializer;
 import org.opendaylight.protocol.framework.SessionListenerFactory;
 
@@ -23,6 +30,70 @@ class TcpClientChannelInitializer extends AbstractChannelInitializer<NetconfClie
         this.sessionListener = sessionListener;
     }
 
+    @Override
+    public void initialize(final Channel ch, Promise<NetconfClientSession> promise) {
+        final Future negotiationFuture = promise;
+
+        //We have to add this channel outbound handler to channel pipeline, in order
+        //to get notifications from netconf negotiatior. Set connection promise to
+        //success only after successful negotiation.
+        ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
+            ChannelPromise connectPromise;
+            GenericFutureListener negotiationFutureListener;
+
+            @Override
+            public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
+                                final ChannelPromise channelPromise) throws Exception {
+                connectPromise = channelPromise;
+                ChannelPromise tcpConnectFuture = new DefaultChannelPromise(ch);
+
+                negotiationFutureListener = new GenericFutureListener<Future<Object>>() {
+                    @Override
+                    public void operationComplete(Future future) throws Exception {
+                        if (future.isSuccess())
+                            connectPromise.setSuccess();
+                    }
+                };
+
+                tcpConnectFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
+                    @Override
+                    public void operationComplete(Future<? super Void> future) throws Exception {
+                        if(future.isSuccess()) {
+                            //complete connection promise with netconf negotiation future
+                            negotiationFuture.addListener(negotiationFutureListener);
+                        } else {
+                            connectPromise.setFailure(future.cause());
+                        }
+                    }
+                });
+                ctx.connect(remoteAddress, localAddress, tcpConnectFuture);
+            }
+
+            @Override
+            public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+                // If we have already succeeded and the session was dropped after, we need to fire inactive to notify reconnect logic
+                if(connectPromise.isSuccess()) {
+                    ctx.fireChannelInactive();
+                }
+
+                //If connection promise is not already set, it means negotiation failed
+                //we must set connection promise to failure
+                if(!connectPromise.isDone()) {
+                    connectPromise.setFailure(new IllegalStateException("Negotiation failed"));
+                }
+
+                //Remove listener from negotiation future, we don't want notifications
+                //from negotiation anymore
+                negotiationFuture.removeListener(negotiationFutureListener);
+
+                super.disconnect(ctx, promise);
+                promise.setSuccess();
+            }
+        });
+
+        super.initialize(ch, promise);
+    }
+
     @Override
     protected void initializeSessionNegotiator(final Channel ch, final Promise<NetconfClientSession> promise) {
         ch.pipeline().addAfter(NETCONF_MESSAGE_DECODER, AbstractChannelInitializer.NETCONF_SESSION_NEGOTIATOR,
index 4bfd94c9109db41b47d756eef5f3d29e4ea17656..3482c1fbef34e4882b753e0cd3949f189101709f 100644 (file)
@@ -23,18 +23,18 @@ import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.Promise;
 import java.util.concurrent.TimeUnit;
-import org.opendaylight.netconf.util.messages.FramingMechanism;
-import org.opendaylight.netconf.api.messages.NetconfHelloMessage;
 import org.opendaylight.netconf.api.NetconfDocumentedException;
 import org.opendaylight.netconf.api.NetconfMessage;
 import org.opendaylight.netconf.api.NetconfSessionListener;
 import org.opendaylight.netconf.api.NetconfSessionPreferences;
+import org.opendaylight.netconf.api.messages.NetconfHelloMessage;
 import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
 import org.opendaylight.netconf.nettyutil.handler.FramingMechanismHandlerFactory;
 import org.opendaylight.netconf.nettyutil.handler.NetconfChunkAggregator;
 import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToXMLEncoder;
 import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToHelloMessageDecoder;
 import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder;
+import org.opendaylight.netconf.util.messages.FramingMechanism;
 import org.opendaylight.protocol.framework.AbstractSessionNegotiator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -125,10 +125,10 @@ public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionP
                         // Do not fail negotiation if promise is done or canceled
                         // It would result in setting result of the promise second time and that throws exception
                         if (isPromiseFinished() == false) {
-                            negotiationFailed(new IllegalStateException("Session was not established after " + timeout));
+                            LOG.warn("Netconf session was not established after {}", connectionTimeoutMillis);
                             changeState(State.FAILED);
 
-                            channel.closeFuture().addListener(new GenericFutureListener<ChannelFuture>() {
+                            channel.close().addListener(new GenericFutureListener<ChannelFuture>() {
                                 @Override
                                 public void operationComplete(final ChannelFuture future) throws Exception {
                                     if(future.isSuccess()) {
index 6a1baacea9a9bfa483bae1132f9760b59491296e..ca654c6fdb6c6796a4c663564fdadbdd409d62ca 100644 (file)
@@ -13,6 +13,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.HashMap;
@@ -33,13 +35,10 @@ import org.slf4j.LoggerFactory;
  */
 public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
-    private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandler.class);
     public static final String SUBSYSTEM = "netconf";
-
     public static final SshClient DEFAULT_CLIENT = SshClient.setUpDefaultClient();
-
     public static final int SSH_DEFAULT_NIO_WORKERS = 8;
-
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandler.class);
     // Disable default timeouts from mina sshd
     private static final long DEFAULT_TIMEOUT = -1L;
 
@@ -57,6 +56,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
     private final AuthenticationHandler authenticationHandler;
     private final SshClient sshClient;
+    private Future negotiationFuture;
 
     private AsyncSshHandlerReader sshReadAsyncListener;
     private AsyncSshHandlerWriter sshWriteAsyncHandler;
@@ -64,10 +64,12 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     private ClientChannel channel;
     private ClientSession session;
     private ChannelPromise connectPromise;
+    private GenericFutureListener negotiationFutureListener;
 
 
-    public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler) throws IOException {
-        return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT);
+    public AsyncSshHandler(final AuthenticationHandler authenticationHandler, final SshClient sshClient, final Future negotiationFuture) throws IOException {
+        this(authenticationHandler, sshClient);
+        this.negotiationFuture = negotiationFuture;
     }
 
     /**
@@ -83,6 +85,24 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         sshClient.start();
     }
 
+    public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler) throws IOException {
+        return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT);
+    }
+
+    /**
+     *
+     * Create AsyncSshHandler for netconf subsystem. Negotiation future has to be set to success after successful netconf
+     * negotiation.
+     *
+     * @param authenticationHandler
+     * @param negotiationFuture
+     * @return
+     * @throws IOException
+     */
+    public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler, final Future negotiationFuture) throws IOException {
+        return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT, negotiationFuture);
+    }
+
     private void startSsh(final ChannelHandlerContext ctx, final SocketAddress address) {
         LOG.debug("Starting SSH to {} on channel: {}", address, ctx.channel());
 
@@ -150,7 +170,9 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     private synchronized void handleSshChanelOpened(final ChannelHandlerContext ctx) {
         LOG.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel());
 
-        connectPromise.setSuccess();
+        if(negotiationFuture == null) {
+            connectPromise.setSuccess();
+        }
 
         // TODO we should also read from error stream and at least log from that
 
@@ -175,12 +197,13 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
     private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
         LOG.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), e);
-        disconnect(ctx, ctx.newPromise());
 
         // If the promise is not yet done, we have failed with initial connect and set connectPromise to failure
         if(!connectPromise.isDone()) {
             connectPromise.setFailure(e);
         }
+
+        disconnect(ctx, ctx.newPromise());
     }
 
     @Override
@@ -192,6 +215,19 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
         LOG.debug("SSH session connecting on channel {}. promise: {} ", ctx.channel(), connectPromise);
         this.connectPromise = promise;
+
+        if(negotiationFuture != null) {
+
+            negotiationFutureListener = new GenericFutureListener<Future<Object>>() {
+                @Override
+                public void operationComplete(Future future) throws Exception {
+                    if (future.isSuccess())
+                        connectPromise.setSuccess();
+                }
+            };
+            //complete connection promise with netconf negotiation future
+            negotiationFuture.addListener(negotiationFutureListener);
+        }
         startSsh(ctx, remoteAddress);
     }
 
@@ -217,6 +253,18 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             sshReadAsyncListener.close();
         }
 
+        //If connection promise is not already set, it means negotiation failed
+        //we must set connection promise to failure
+        if(!connectPromise.isDone()) {
+            connectPromise.setFailure(new IllegalStateException("Negotiation failed"));
+        }
+
+        //Remove listener from negotiation future, we don't want notifications
+        //from negotiation anymore
+        if(negotiationFuture != null) {
+            negotiationFuture.removeListener(negotiationFutureListener);
+        }
+
         if(session!= null && !session.isClosed() && !session.isClosing()) {
             session.close(false).addListener(new SshFutureListener<CloseFuture>() {
                 @Override
index 90c5e4d4870bc61d6f888549a33290f74c8e797f..a6da457153b6ea98321da9f220cdecceed705cea 100644 (file)
@@ -47,6 +47,12 @@ public final class AsyncSshHandlerReader implements SshFutureListener<IoReadFutu
     @Override
     public synchronized void operationComplete(final IoReadFuture future) {
         if(future.getException() != null) {
+
+            //if asyncout is already set to null by close method, do nothing
+            if(asyncOut == null) {
+                return;
+            }
+
             if(asyncOut.isClosed() || asyncOut.isClosing()) {
                 // Ssh dropped
                 LOG.debug("Ssh session dropped on channel: {}", channelId, future.getException());