X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fnettyutil%2FAbstractNetconfSessionNegotiator.java;h=a5d65a37c9959fdf58558a935ad5ced2d44d2de4;hb=816bd41f7e713627faa114a9eee843bf8e02b03d;hp=63c6668a1a9637261e9a707e2df9ff50606f56b4;hpb=4eb7c36e28e610ee78b58e52f43af6c5527c9ebe;p=netconf.git diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java index 63c6668a1a..a5d65a37c9 100644 --- a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java +++ b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java @@ -7,42 +7,45 @@ */ package org.opendaylight.netconf.nettyutil; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import com.google.common.annotations.Beta; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.ssl.SslHandler; import io.netty.util.Timeout; import io.netty.util.Timer; -import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import java.util.Optional; import java.util.concurrent.TimeUnit; +import org.checkerframework.checker.index.qual.NonNegative; import org.checkerframework.checker.lock.qual.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.netconf.api.CapabilityURN; +import org.opendaylight.netconf.api.NamespaceURN; import org.opendaylight.netconf.api.NetconfDocumentedException; -import org.opendaylight.netconf.api.NetconfMessage; import org.opendaylight.netconf.api.NetconfSessionListener; -import org.opendaylight.netconf.api.NetconfSessionPreferences; -import org.opendaylight.netconf.api.messages.NetconfHelloMessage; +import org.opendaylight.netconf.api.messages.HelloMessage; +import org.opendaylight.netconf.api.messages.NetconfMessage; import org.opendaylight.netconf.api.xml.XmlNetconfConstants; -import org.opendaylight.netconf.nettyutil.handler.FramingMechanismHandlerFactory; +import org.opendaylight.netconf.nettyutil.handler.ChunkedFramingMechanismEncoder; import org.opendaylight.netconf.nettyutil.handler.NetconfChunkAggregator; import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToXMLEncoder; import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToHelloMessageDecoder; import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder; -import org.opendaylight.netconf.util.messages.FramingMechanism; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.NodeList; -public abstract class AbstractNetconfSessionNegotiator
, L extends NetconfSessionListener>
+public abstract class AbstractNetconfSessionNegotiator,
+ L extends NetconfSessionListener>
extends ChannelInboundHandlerAdapter implements NetconfSessionNegotiator {
/**
* Possible states for Finite State Machine.
@@ -52,40 +55,68 @@ public abstract class AbstractNetconfSessionNegotiator
promise;
private final L sessionListener;
private final Timer timer;
+ @GuardedBy("this")
private Timeout timeoutTask;
-
@GuardedBy("this")
private State state = State.IDLE;
- protected AbstractNetconfSessionNegotiator(final P sessionPreferences, final Promise getSslHandler(final Channel channel) {
- return Optional.ofNullable(channel.pipeline().get(SslHandler.class));
+ private synchronized State state() {
+ return state;
}
- public P getSessionPreferences() {
- return sessionPreferences;
+ private static @Nullable SslHandler getSslHandler(final Channel channel) {
+ return channel.pipeline().get(SslHandler.class);
}
private void start() {
- final NetconfHelloMessage helloMessage = this.sessionPreferences.getHelloMessage();
- LOG.debug("Session negotiation started with hello message {} on channel {}", helloMessage, channel);
+ LOG.debug("Sending negotiation proposal {} on channel {}", localHello, channel);
+
+ // Send the message out, but to not run listeners just yet, as we have some more state transitions to go through
+ final var helloFuture = channel.writeAndFlush(localHello);
+
+ // Quick check: if the future has already failed we call it quits before negotiation even started
+ final var helloCause = helloFuture.cause();
+ if (helloCause != null) {
+ LOG.warn("Failed to send negotiation proposal on channel {}", channel, helloCause);
+ failAndClose();
+ return;
+ }
+
+ // Catch any exceptions from this point on. Use a named class to ease debugging.
+ final class ExceptionHandlingInboundChannelHandler extends ChannelInboundHandlerAdapter {
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ LOG.warn("An exception occurred during negotiation with {} on channel {}",
+ channel.remoteAddress(), channel, cause);
+ // FIXME: this is quite suspect as it is competing with timeoutExpired() without synchronization
+ cancelTimeout();
+ negotiationFailed(cause);
+ changeState(State.FAILED);
+ }
+ }
channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
- sendMessage(helloMessage);
+ // Remove special outbound handler for hello message. Insert regular netconf xml message (en|de)coders.
+ replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER,
+ new NetconfMessageToXMLEncoder());
+
+ synchronized (this) {
+ lockedChangeState(State.OPEN_WAIT);
- replaceHelloMessageOutboundHandler();
- changeState(State.OPEN_WAIT);
+ // Service the timeout on channel's eventloop, so that we do not get state transition problems
+ timeoutTask = timer.newTimeout(unused -> channel.eventLoop().execute(this::timeoutExpired),
+ connectionTimeoutMillis, TimeUnit.MILLISECONDS);
+ }
+
+ LOG.debug("Session negotiation started on channel {}", channel);
+
+ // State transition completed, now run any additional processing
+ helloFuture.addListener(this::onHelloWriteComplete);
+ }
- timeoutTask = this.timer.newTimeout(this::timeoutExpired, connectionTimeoutMillis, TimeUnit.MILLISECONDS);
+ private void onHelloWriteComplete(final Future> future) {
+ final var cause = future.cause();
+ if (cause != null) {
+ LOG.info("Failed to send message {} on channel {}", localHello, channel, cause);
+ negotiationFailed(cause);
+ } else {
+ LOG.trace("Message {} sent to socket on channel {}", localHello, channel);
+ }
}
- private synchronized void timeoutExpired(final Timeout timeout) {
+ private synchronized void timeoutExpired() {
+ if (timeoutTask == null) {
+ // cancelTimeout() between expiry and execution on the loop
+ return;
+ }
+ timeoutTask = null;
+
if (state != State.ESTABLISHED) {
- LOG.debug("Connection timeout after {}, session backed by channel {} is in state {}", timeout, channel,
- state);
+ LOG.debug("Connection timeout after {}ms, session backed by channel {} is in state {}",
+ connectionTimeoutMillis, channel, state);
// Do not fail negotiation if promise is done or canceled
// It would result in setting result of the promise second time and that throws exception
if (!promise.isDone() && !promise.isCancelled()) {
LOG.warn("Netconf session backed by channel {} was not established after {}", channel,
connectionTimeoutMillis);
- changeState(State.FAILED);
-
- channel.close().addListener((GenericFutureListener {
- if (!f.isSuccess()) {
- LOG.info("Failed to send message {} on channel {}", msg, channel, f.cause());
- negotiationFailed(f.cause());
- } else {
- LOG.trace("Message {} sent to socket on channel {}", msg, channel);
- }
- });
- }
-
@Override
@SuppressWarnings("checkstyle:illegalCatch")
public final void channelActive(final ChannelHandlerContext ctx) {
@@ -317,9 +360,14 @@ public abstract class AbstractNetconfSessionNegotiator promise,
- final Channel channel, final Timer timer,
- final L sessionListener, final long connectionTimeoutMillis) {
- this.channel = requireNonNull(channel);
+ protected AbstractNetconfSessionNegotiator(final HelloMessage hello, final Promise promise,
+ final Channel channel, final Timer timer, final L sessionListener,
+ final long connectionTimeoutMillis,
+ final @NonNegative int maximumIncomingChunkSize) {
+ localHello = requireNonNull(hello);
this.promise = requireNonNull(promise);
- this.sessionPreferences = sessionPreferences;
+ this.channel = requireNonNull(channel);
this.timer = timer;
this.sessionListener = sessionListener;
this.connectionTimeoutMillis = connectionTimeoutMillis;
+ this.maximumIncomingChunkSize = maximumIncomingChunkSize;
+ checkArgument(maximumIncomingChunkSize > 0, "Invalid maximum incoming chunk size %s", maximumIncomingChunkSize);
+ }
+
+ protected final @NonNull HelloMessage localHello() {
+ return localHello;
}
protected final void startNegotiation() {
if (ifNegotiatedAlready()) {
LOG.debug("Negotiation on channel {} already started", channel);
} else {
- final Optional