From: Maros Marsalek Date: Thu, 11 Feb 2016 18:49:50 +0000 (+0000) Subject: Merge "Bug 2806 - Immediate and infinite reconnect attempts during negotiation" into... X-Git-Tag: release/beryllium~4 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=fdece89e1115f52dc6b65e8843fbbdacca2c67e7;hp=3fece4db52f1ad0a45ade7dbd74176caff6927e8;p=netconf.git Merge "Bug 2806 - Immediate and infinite reconnect attempts during negotiation" into stable/beryllium --- diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/netconf/client/SshClientChannelInitializer.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/netconf/client/SshClientChannelInitializer.java index 9060aaa78d..fd335304c0 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/netconf/client/SshClientChannelInitializer.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/netconf/client/SshClientChannelInitializer.java @@ -33,7 +33,7 @@ final class SshClientChannelInitializer extends AbstractChannelInitializer promise) { try { // ssh handler has to be the first handler in pipeline - ch.pipeline().addFirst(AsyncSshHandler.createForNetconfSubsystem(authenticationHandler)); + ch.pipeline().addFirst(AsyncSshHandler.createForNetconfSubsystem(authenticationHandler, promise)); super.initialize(ch,promise); } catch (final IOException e) { throw new RuntimeException(e); diff --git a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/netconf/client/TcpClientChannelInitializer.java b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/netconf/client/TcpClientChannelInitializer.java index ba258dca70..ca68544561 100644 --- a/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/netconf/client/TcpClientChannelInitializer.java +++ b/opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/netconf/client/TcpClientChannelInitializer.java @@ -8,7 +8,14 @@ package org.opendaylight.netconf.client; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelPromise; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; +import java.net.SocketAddress; import org.opendaylight.netconf.nettyutil.AbstractChannelInitializer; import org.opendaylight.protocol.framework.SessionListenerFactory; @@ -23,6 +30,70 @@ class TcpClientChannelInitializer extends AbstractChannelInitializer promise) { + final Future negotiationFuture = promise; + + //We have to add this channel outbound handler to channel pipeline, in order + //to get notifications from netconf negotiatior. Set connection promise to + //success only after successful negotiation. + ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { + ChannelPromise connectPromise; + GenericFutureListener negotiationFutureListener; + + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, + final ChannelPromise channelPromise) throws Exception { + connectPromise = channelPromise; + ChannelPromise tcpConnectFuture = new DefaultChannelPromise(ch); + + negotiationFutureListener = new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) + connectPromise.setSuccess(); + } + }; + + tcpConnectFuture.addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + if(future.isSuccess()) { + //complete connection promise with netconf negotiation future + negotiationFuture.addListener(negotiationFutureListener); + } else { + connectPromise.setFailure(future.cause()); + } + } + }); + ctx.connect(remoteAddress, localAddress, tcpConnectFuture); + } + + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + // If we have already succeeded and the session was dropped after, we need to fire inactive to notify reconnect logic + if(connectPromise.isSuccess()) { + ctx.fireChannelInactive(); + } + + //If connection promise is not already set, it means negotiation failed + //we must set connection promise to failure + if(!connectPromise.isDone()) { + connectPromise.setFailure(new IllegalStateException("Negotiation failed")); + } + + //Remove listener from negotiation future, we don't want notifications + //from negotiation anymore + negotiationFuture.removeListener(negotiationFutureListener); + + super.disconnect(ctx, promise); + promise.setSuccess(); + } + }); + + super.initialize(ch, promise); + } + @Override protected void initializeSessionNegotiator(final Channel ch, final Promise promise) { ch.pipeline().addAfter(NETCONF_MESSAGE_DECODER, AbstractChannelInitializer.NETCONF_SESSION_NEGOTIATOR, diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java index 4bfd94c910..3482c1fbef 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java @@ -23,18 +23,18 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; import java.util.concurrent.TimeUnit; -import org.opendaylight.netconf.util.messages.FramingMechanism; -import org.opendaylight.netconf.api.messages.NetconfHelloMessage; import org.opendaylight.netconf.api.NetconfDocumentedException; import org.opendaylight.netconf.api.NetconfMessage; import org.opendaylight.netconf.api.NetconfSessionListener; import org.opendaylight.netconf.api.NetconfSessionPreferences; +import org.opendaylight.netconf.api.messages.NetconfHelloMessage; import org.opendaylight.netconf.api.xml.XmlNetconfConstants; import org.opendaylight.netconf.nettyutil.handler.FramingMechanismHandlerFactory; import org.opendaylight.netconf.nettyutil.handler.NetconfChunkAggregator; import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToXMLEncoder; import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToHelloMessageDecoder; import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder; +import org.opendaylight.netconf.util.messages.FramingMechanism; import org.opendaylight.protocol.framework.AbstractSessionNegotiator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,10 +125,10 @@ public abstract class AbstractNetconfSessionNegotiator

() { + channel.close().addListener(new GenericFutureListener() { @Override public void operationComplete(final ChannelFuture future) throws Exception { if(future.isSuccess()) { diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java index 6a1baacea9..ca654c6fdb 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java @@ -13,6 +13,8 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.io.IOException; import java.net.SocketAddress; import java.util.HashMap; @@ -33,13 +35,10 @@ import org.slf4j.LoggerFactory; */ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { - private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandler.class); public static final String SUBSYSTEM = "netconf"; - public static final SshClient DEFAULT_CLIENT = SshClient.setUpDefaultClient(); - public static final int SSH_DEFAULT_NIO_WORKERS = 8; - + private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandler.class); // Disable default timeouts from mina sshd private static final long DEFAULT_TIMEOUT = -1L; @@ -57,6 +56,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private final AuthenticationHandler authenticationHandler; private final SshClient sshClient; + private Future negotiationFuture; private AsyncSshHandlerReader sshReadAsyncListener; private AsyncSshHandlerWriter sshWriteAsyncHandler; @@ -64,10 +64,12 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private ClientChannel channel; private ClientSession session; private ChannelPromise connectPromise; + private GenericFutureListener negotiationFutureListener; - public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler) throws IOException { - return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT); + public AsyncSshHandler(final AuthenticationHandler authenticationHandler, final SshClient sshClient, final Future negotiationFuture) throws IOException { + this(authenticationHandler, sshClient); + this.negotiationFuture = negotiationFuture; } /** @@ -83,6 +85,24 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { sshClient.start(); } + public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler) throws IOException { + return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT); + } + + /** + * + * Create AsyncSshHandler for netconf subsystem. Negotiation future has to be set to success after successful netconf + * negotiation. + * + * @param authenticationHandler + * @param negotiationFuture + * @return + * @throws IOException + */ + public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler, final Future negotiationFuture) throws IOException { + return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT, negotiationFuture); + } + private void startSsh(final ChannelHandlerContext ctx, final SocketAddress address) { LOG.debug("Starting SSH to {} on channel: {}", address, ctx.channel()); @@ -150,7 +170,9 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private synchronized void handleSshChanelOpened(final ChannelHandlerContext ctx) { LOG.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel()); - connectPromise.setSuccess(); + if(negotiationFuture == null) { + connectPromise.setSuccess(); + } // TODO we should also read from error stream and at least log from that @@ -175,12 +197,13 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) { LOG.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), e); - disconnect(ctx, ctx.newPromise()); // If the promise is not yet done, we have failed with initial connect and set connectPromise to failure if(!connectPromise.isDone()) { connectPromise.setFailure(e); } + + disconnect(ctx, ctx.newPromise()); } @Override @@ -192,6 +215,19 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception { LOG.debug("SSH session connecting on channel {}. promise: {} ", ctx.channel(), connectPromise); this.connectPromise = promise; + + if(negotiationFuture != null) { + + negotiationFutureListener = new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) + connectPromise.setSuccess(); + } + }; + //complete connection promise with netconf negotiation future + negotiationFuture.addListener(negotiationFutureListener); + } startSsh(ctx, remoteAddress); } @@ -217,6 +253,18 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { sshReadAsyncListener.close(); } + //If connection promise is not already set, it means negotiation failed + //we must set connection promise to failure + if(!connectPromise.isDone()) { + connectPromise.setFailure(new IllegalStateException("Negotiation failed")); + } + + //Remove listener from negotiation future, we don't want notifications + //from negotiation anymore + if(negotiationFuture != null) { + negotiationFuture.removeListener(negotiationFutureListener); + } + if(session!= null && !session.isClosed() && !session.isClosing()) { session.close(false).addListener(new SshFutureListener() { @Override diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerReader.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerReader.java index 90c5e4d487..a6da457153 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerReader.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerReader.java @@ -47,6 +47,12 @@ public final class AsyncSshHandlerReader implements SshFutureListener