X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fnettyutil%2FAbstractNetconfSessionNegotiator.java;h=a5d65a37c9959fdf58558a935ad5ced2d44d2de4;hb=816bd41f7e713627faa114a9eee843bf8e02b03d;hp=63c6668a1a9637261e9a707e2df9ff50606f56b4;hpb=4eb7c36e28e610ee78b58e52f43af6c5527c9ebe;p=netconf.git diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java index 63c6668a1a..a5d65a37c9 100644 --- a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java +++ b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java @@ -7,42 +7,45 @@ */ package org.opendaylight.netconf.nettyutil; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import com.google.common.annotations.Beta; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.ssl.SslHandler; import io.netty.util.Timeout; import io.netty.util.Timer; -import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import java.util.Optional; import java.util.concurrent.TimeUnit; +import org.checkerframework.checker.index.qual.NonNegative; import org.checkerframework.checker.lock.qual.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.netconf.api.CapabilityURN; +import org.opendaylight.netconf.api.NamespaceURN; import org.opendaylight.netconf.api.NetconfDocumentedException; -import org.opendaylight.netconf.api.NetconfMessage; import org.opendaylight.netconf.api.NetconfSessionListener; -import org.opendaylight.netconf.api.NetconfSessionPreferences; -import org.opendaylight.netconf.api.messages.NetconfHelloMessage; +import org.opendaylight.netconf.api.messages.HelloMessage; +import org.opendaylight.netconf.api.messages.NetconfMessage; import org.opendaylight.netconf.api.xml.XmlNetconfConstants; -import org.opendaylight.netconf.nettyutil.handler.FramingMechanismHandlerFactory; +import org.opendaylight.netconf.nettyutil.handler.ChunkedFramingMechanismEncoder; import org.opendaylight.netconf.nettyutil.handler.NetconfChunkAggregator; import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToXMLEncoder; import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToHelloMessageDecoder; import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder; -import org.opendaylight.netconf.util.messages.FramingMechanism; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.NodeList; -public abstract class AbstractNetconfSessionNegotiator

, L extends NetconfSessionListener> +public abstract class AbstractNetconfSessionNegotiator, + L extends NetconfSessionListener> extends ChannelInboundHandlerAdapter implements NetconfSessionNegotiator { /** * Possible states for Finite State Machine. @@ -52,40 +55,68 @@ public abstract class AbstractNetconfSessionNegotiator

promise; private final L sessionListener; private final Timer timer; + @GuardedBy("this") private Timeout timeoutTask; - @GuardedBy("this") private State state = State.IDLE; - protected AbstractNetconfSessionNegotiator(final P sessionPreferences, final Promise promise, - final Channel channel, final Timer timer, - final L sessionListener, final long connectionTimeoutMillis) { - this.channel = requireNonNull(channel); + protected AbstractNetconfSessionNegotiator(final HelloMessage hello, final Promise promise, + final Channel channel, final Timer timer, final L sessionListener, + final long connectionTimeoutMillis, + final @NonNegative int maximumIncomingChunkSize) { + localHello = requireNonNull(hello); this.promise = requireNonNull(promise); - this.sessionPreferences = sessionPreferences; + this.channel = requireNonNull(channel); this.timer = timer; this.sessionListener = sessionListener; this.connectionTimeoutMillis = connectionTimeoutMillis; + this.maximumIncomingChunkSize = maximumIncomingChunkSize; + checkArgument(maximumIncomingChunkSize > 0, "Invalid maximum incoming chunk size %s", maximumIncomingChunkSize); + } + + protected final @NonNull HelloMessage localHello() { + return localHello; } protected final void startNegotiation() { if (ifNegotiatedAlready()) { LOG.debug("Negotiation on channel {} already started", channel); } else { - final Optional sslHandler = getSslHandler(channel); - if (sslHandler.isPresent()) { - sslHandler.get().handshakeFuture().addListener(future -> { + final var sslHandler = getSslHandler(channel); + if (sslHandler != null) { + sslHandler.handshakeFuture().addListener(future -> { checkState(future.isSuccess(), "Ssl handshake was not successful"); LOG.debug("Ssl handshake complete"); start(); @@ -96,67 +127,121 @@ public abstract class AbstractNetconfSessionNegotiator

getSslHandler(final Channel channel) { - return Optional.ofNullable(channel.pipeline().get(SslHandler.class)); + private synchronized State state() { + return state; } - public P getSessionPreferences() { - return sessionPreferences; + private static @Nullable SslHandler getSslHandler(final Channel channel) { + return channel.pipeline().get(SslHandler.class); } private void start() { - final NetconfHelloMessage helloMessage = this.sessionPreferences.getHelloMessage(); - LOG.debug("Session negotiation started with hello message {} on channel {}", helloMessage, channel); + LOG.debug("Sending negotiation proposal {} on channel {}", localHello, channel); + + // Send the message out, but to not run listeners just yet, as we have some more state transitions to go through + final var helloFuture = channel.writeAndFlush(localHello); + + // Quick check: if the future has already failed we call it quits before negotiation even started + final var helloCause = helloFuture.cause(); + if (helloCause != null) { + LOG.warn("Failed to send negotiation proposal on channel {}", channel, helloCause); + failAndClose(); + return; + } + + // Catch any exceptions from this point on. Use a named class to ease debugging. + final class ExceptionHandlingInboundChannelHandler extends ChannelInboundHandlerAdapter { + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + LOG.warn("An exception occurred during negotiation with {} on channel {}", + channel.remoteAddress(), channel, cause); + // FIXME: this is quite suspect as it is competing with timeoutExpired() without synchronization + cancelTimeout(); + negotiationFailed(cause); + changeState(State.FAILED); + } + } channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler()); - sendMessage(helloMessage); + // Remove special outbound handler for hello message. Insert regular netconf xml message (en|de)coders. + replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, + new NetconfMessageToXMLEncoder()); + + synchronized (this) { + lockedChangeState(State.OPEN_WAIT); - replaceHelloMessageOutboundHandler(); - changeState(State.OPEN_WAIT); + // Service the timeout on channel's eventloop, so that we do not get state transition problems + timeoutTask = timer.newTimeout(unused -> channel.eventLoop().execute(this::timeoutExpired), + connectionTimeoutMillis, TimeUnit.MILLISECONDS); + } + + LOG.debug("Session negotiation started on channel {}", channel); + + // State transition completed, now run any additional processing + helloFuture.addListener(this::onHelloWriteComplete); + } - timeoutTask = this.timer.newTimeout(this::timeoutExpired, connectionTimeoutMillis, TimeUnit.MILLISECONDS); + private void onHelloWriteComplete(final Future future) { + final var cause = future.cause(); + if (cause != null) { + LOG.info("Failed to send message {} on channel {}", localHello, channel, cause); + negotiationFailed(cause); + } else { + LOG.trace("Message {} sent to socket on channel {}", localHello, channel); + } } - private synchronized void timeoutExpired(final Timeout timeout) { + private synchronized void timeoutExpired() { + if (timeoutTask == null) { + // cancelTimeout() between expiry and execution on the loop + return; + } + timeoutTask = null; + if (state != State.ESTABLISHED) { - LOG.debug("Connection timeout after {}, session backed by channel {} is in state {}", timeout, channel, - state); + LOG.debug("Connection timeout after {}ms, session backed by channel {} is in state {}", + connectionTimeoutMillis, channel, state); // Do not fail negotiation if promise is done or canceled // It would result in setting result of the promise second time and that throws exception if (!promise.isDone() && !promise.isCancelled()) { LOG.warn("Netconf session backed by channel {} was not established after {}", channel, connectionTimeoutMillis); - changeState(State.FAILED); - - channel.close().addListener((GenericFutureListener) future -> { - if (future.isSuccess()) { - LOG.debug("Channel {} closed: success", future.channel()); - } else { - LOG.warn("Channel {} closed: fail", future.channel()); - } - }); + failAndClose(); } } else if (channel.isOpen()) { channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER); } } - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") - private void cancelTimeout() { - if (timeoutTask != null) { - timeoutTask.cancel(); + private void failAndClose() { + changeState(State.FAILED); + channel.close().addListener(this::onChannelClosed); + } + + private void onChannelClosed(final Future future) { + final var cause = future.cause(); + if (cause != null) { + LOG.warn("Channel {} closed: fail", channel, cause); + } else { + LOG.debug("Channel {} closed: success", channel); + } + } + + private synchronized void cancelTimeout() { + if (timeoutTask != null && !timeoutTask.cancel()) { + // Late-coming cancel: make sure the task does not actually run + timeoutTask = null; } } - protected final S getSessionForHelloMessage(final NetconfHelloMessage netconfMessage) + protected final S getSessionForHelloMessage(final HelloMessage netconfMessage) throws NetconfDocumentedException { final Document doc = netconfMessage.getDocument(); @@ -168,19 +253,21 @@ public abstract class AbstractNetconfSessionNegotiator

{ - if (!f.isSuccess()) { - LOG.info("Failed to send message {} on channel {}", msg, channel, f.cause()); - negotiationFailed(f.cause()); - } else { - LOG.trace("Message {} sent to socket on channel {}", msg, channel); - } - }); - } - @Override @SuppressWarnings("checkstyle:illegalCatch") public final void channelActive(final ChannelHandlerContext ctx) { @@ -317,9 +360,14 @@ public abstract class AbstractNetconfSessionNegotiator