X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fnettyutil%2FAbstractNetconfSessionNegotiator.java;h=266f2cf490666afa2798a63cbe0b47865242f1fa;hb=refs%2Fchanges%2F15%2F102615%2F2;hp=b3d35dc3c0d05bd8811505b4c450948052c9f0ff;hpb=5c0f8c590b10ef29ac12429c59d08d9a694019ee;p=netconf.git diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java index b3d35dc3c0..266f2cf490 100644 --- a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java +++ b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java @@ -5,28 +5,30 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.nettyutil; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import com.google.common.annotations.Beta; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.ssl.SslHandler; import io.netty.util.Timeout; import io.netty.util.Timer; -import io.netty.util.TimerTask; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; import java.util.concurrent.TimeUnit; +import org.checkerframework.checker.index.qual.NonNegative; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.netconf.api.NetconfDocumentedException; import org.opendaylight.netconf.api.NetconfMessage; import org.opendaylight.netconf.api.NetconfSessionListener; -import org.opendaylight.netconf.api.NetconfSessionPreferences; import org.opendaylight.netconf.api.messages.NetconfHelloMessage; import org.opendaylight.netconf.api.xml.XmlNetconfConstants; import org.opendaylight.netconf.nettyutil.handler.FramingMechanismHandlerFactory; @@ -35,131 +37,169 @@ import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToXMLEncoder; import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToHelloMessageDecoder; import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder; import org.opendaylight.netconf.util.messages.FramingMechanism; -import org.opendaylight.protocol.framework.AbstractSessionNegotiator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.NodeList; -public abstract class AbstractNetconfSessionNegotiator

, L extends NetconfSessionListener> - extends AbstractSessionNegotiator { +public abstract class AbstractNetconfSessionNegotiator, + L extends NetconfSessionListener> + extends ChannelInboundHandlerAdapter implements NetconfSessionNegotiator { + /** + * Possible states for Finite State Machine. + */ + protected enum State { + IDLE, OPEN_WAIT, FAILED, ESTABLISHED + } private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class); - - public static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler"; - - protected final P sessionPreferences; - - private final L sessionListener; - private Timeout timeout; + private static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler"; + private static final String DEFAULT_MAXIMUM_CHUNK_SIZE_PROP = "org.opendaylight.netconf.default.maximum.chunk.size"; + private static final int DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT = 16 * 1024 * 1024; /** - * Possible states for Finite State Machine + * Default upper bound on the size of an individual chunk. This value can be controlled through + * {@value #DEFAULT_MAXIMUM_CHUNK_SIZE_PROP} system property and defaults to + * {@value #DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT} bytes. */ - protected enum State { - IDLE, OPEN_WAIT, FAILED, ESTABLISHED + @Beta + public static final @NonNegative int DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE; + + static { + final int propValue = Integer.getInteger(DEFAULT_MAXIMUM_CHUNK_SIZE_PROP, DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT); + if (propValue <= 0) { + LOG.warn("Ignoring invalid {} value {}", DEFAULT_MAXIMUM_CHUNK_SIZE_PROP, propValue); + DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE = DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT; + } else { + DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE = propValue; + } + LOG.debug("Default maximum incoming NETCONF chunk size is {} bytes", DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE); } - private State state = State.IDLE; + private final @NonNull NetconfHelloMessage localHello; + protected final Channel channel; + + private final @NonNegative int maximumIncomingChunkSize; + private final long connectionTimeoutMillis; private final Promise promise; + private final L sessionListener; private final Timer timer; - private final long connectionTimeoutMillis; - protected AbstractNetconfSessionNegotiator(final P sessionPreferences, final Promise promise, final Channel channel, final Timer timer, - final L sessionListener, final long connectionTimeoutMillis) { - super(promise, channel); - this.sessionPreferences = sessionPreferences; - this.promise = promise; + @GuardedBy("this") + private Timeout timeoutTask; + @GuardedBy("this") + private State state = State.IDLE; + + protected AbstractNetconfSessionNegotiator(final NetconfHelloMessage hello, final Promise promise, + final Channel channel, final Timer timer, final L sessionListener, + final long connectionTimeoutMillis, + final @NonNegative int maximumIncomingChunkSize) { + this.localHello = requireNonNull(hello); + this.promise = requireNonNull(promise); + this.channel = requireNonNull(channel); this.timer = timer; this.sessionListener = sessionListener; this.connectionTimeoutMillis = connectionTimeoutMillis; + this.maximumIncomingChunkSize = maximumIncomingChunkSize; + checkArgument(maximumIncomingChunkSize > 0, "Invalid maximum incoming chunk size %s", maximumIncomingChunkSize); + } + + @Deprecated(since = "4.0.1", forRemoval = true) + protected AbstractNetconfSessionNegotiator(final NetconfHelloMessage hello, final Promise promise, + final Channel channel, final Timer timer, + final L sessionListener, final long connectionTimeoutMillis) { + this(hello, promise, channel, timer, sessionListener, connectionTimeoutMillis, + DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE); + } + + protected final @NonNull NetconfHelloMessage localHello() { + return localHello; } - @Override protected final void startNegotiation() { - final Optional sslHandler = getSslHandler(channel); - if (sslHandler.isPresent()) { - Future future = sslHandler.get().handshakeFuture(); - future.addListener(new GenericFutureListener>() { - @Override - public void operationComplete(final Future future) { - Preconditions.checkState(future.isSuccess(), "Ssl handshake was not successful"); + if (ifNegotiatedAlready()) { + LOG.debug("Negotiation on channel {} already started", channel); + } else { + final var sslHandler = getSslHandler(channel); + if (sslHandler != null) { + sslHandler.handshakeFuture().addListener(future -> { + checkState(future.isSuccess(), "Ssl handshake was not successful"); LOG.debug("Ssl handshake complete"); start(); - } - }); - } else { - start(); + }); + } else { + start(); + } } } - private static Optional getSslHandler(final Channel channel) { - final SslHandler sslHandler = channel.pipeline().get(SslHandler.class); - return sslHandler == null ? Optional. absent() : Optional.of(sslHandler); + protected final synchronized boolean ifNegotiatedAlready() { + // Indicates whether negotiation already started + return this.state != State.IDLE; } - public P getSessionPreferences() { - return sessionPreferences; + private static @Nullable SslHandler getSslHandler(final Channel channel) { + return channel.pipeline().get(SslHandler.class); } private void start() { - final NetconfHelloMessage helloMessage = this.sessionPreferences.getHelloMessage(); - LOG.debug("Session negotiation started with hello message {} on channel {}", helloMessage, channel); + LOG.debug("Session negotiation started with hello message {} on channel {}", localHello, channel); channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler()); - sendMessage(helloMessage); + sendMessage(localHello); replaceHelloMessageOutboundHandler(); - changeState(State.OPEN_WAIT); - - timeout = this.timer.newTimeout(new TimerTask() { - @Override - public void run(final Timeout timeout) { - synchronized (this) { - if (state != State.ESTABLISHED) { - - LOG.debug("Connection timeout after {}, session is in state {}", timeout, state); - - // Do not fail negotiation if promise is done or canceled - // It would result in setting result of the promise second time and that throws exception - if (isPromiseFinished() == false) { - LOG.warn("Netconf session was not established after {}", connectionTimeoutMillis); - changeState(State.FAILED); - - channel.close().addListener(new GenericFutureListener() { - @Override - public void operationComplete(final ChannelFuture future) throws Exception { - if(future.isSuccess()) { - LOG.debug("Channel {} closed: success", future.channel()); - } else { - LOG.warn("Channel {} closed: fail", future.channel()); - } - } - }); - } - } else if(channel.isOpen()) { - channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER); - } - } - } - private boolean isPromiseFinished() { - return promise.isDone() || promise.isCancelled(); - } + synchronized (this) { + lockedChangeState(State.OPEN_WAIT); - }, connectionTimeoutMillis, TimeUnit.MILLISECONDS); + // Service the timeout on channel's eventloop, so that we do not get state transition problems + timeoutTask = timer.newTimeout(unused -> channel.eventLoop().execute(this::timeoutExpired), + connectionTimeoutMillis, TimeUnit.MILLISECONDS); + } } - private void cancelTimeout() { - if(timeout!=null) { - timeout.cancel(); + private synchronized void timeoutExpired() { + if (timeoutTask == null) { + // cancelTimeout() between expiry and execution on the loop + return; + } + timeoutTask = null; + + if (state != State.ESTABLISHED) { + LOG.debug("Connection timeout after {}ms, session backed by channel {} is in state {}", + connectionTimeoutMillis, channel, state); + + // Do not fail negotiation if promise is done or canceled + // It would result in setting result of the promise second time and that throws exception + if (!promise.isDone() && !promise.isCancelled()) { + LOG.warn("Netconf session backed by channel {} was not established after {}", channel, + connectionTimeoutMillis); + changeState(State.FAILED); + + channel.close().addListener(future -> { + if (future.isSuccess()) { + LOG.debug("Channel {} closed: success", channel); + } else { + LOG.warn("Channel {} closed: fail", channel); + } + }); + } + } else if (channel.isOpen()) { + channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER); } } - protected final S getSessionForHelloMessage(final NetconfHelloMessage netconfMessage) throws NetconfDocumentedException { - Preconditions.checkNotNull(netconfMessage, "netconfMessage"); + private synchronized void cancelTimeout() { + if (timeoutTask != null && !timeoutTask.cancel()) { + // Late-coming cancel: make sure + timeoutTask = null; + } + } + protected final S getSessionForHelloMessage(final NetconfHelloMessage netconfMessage) + throws NetconfDocumentedException { final Document doc = netconfMessage.getDocument(); if (shouldUseChunkFraming(doc)) { @@ -170,37 +210,42 @@ public abstract class AbstractNetconfSessionNegotiator

* Inbound hello message handler should be kept until negotiation is successful * It caches any non-hello messages while negotiation is still in progress */ protected final void replaceHelloMessageInboundHandler(final S session) { - ChannelHandler helloMessageHandler = replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, new NetconfXMLToMessageDecoder()); + ChannelHandler helloMessageHandler = replaceChannelHandler(channel, + AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, new NetconfXMLToMessageDecoder()); - Preconditions.checkState(helloMessageHandler instanceof NetconfXMLToHelloMessageDecoder, + checkState(helloMessageHandler instanceof NetconfXMLToHelloMessageDecoder, "Pipeline handlers misplaced on session: %s, pipeline: %s", session, channel.pipeline()); Iterable netconfMessagesFromNegotiation = ((NetconfXMLToHelloMessageDecoder) helloMessageHandler).getPostHelloNetconfMessages(); // Process messages received during negotiation - // The hello message handler does not have to be synchronized, since it is always call from the same thread by netty + // The hello message handler does not have to be synchronized, + // since it is always call from the same thread by netty. // It means, we are now using the thread now for (NetconfMessage message : netconfMessagesFromNegotiation) { session.handleMessage(message); @@ -211,24 +256,31 @@ public abstract class AbstractNetconfSessionNegotiator

{ + if (!f.isSuccess()) { + LOG.info("Failed to send message {} on channel {}", msg, channel, f.cause()); + negotiationFailed(f.cause()); + } else { + LOG.trace("Message {} sent to socket on channel {}", msg, channel); + } + }); + } + + @Override + @SuppressWarnings("checkstyle:illegalCatch") + public final void channelActive(final ChannelHandlerContext ctx) { + LOG.debug("Starting session negotiation on channel {}", channel); + try { + startNegotiation(); + } catch (final Exception e) { + LOG.warn("Unexpected negotiation failure on channel {}", channel, e); + negotiationFailed(e); + } + } + + @Override + @SuppressWarnings("checkstyle:illegalCatch") + public final void channelRead(final ChannelHandlerContext ctx, final Object msg) { + LOG.debug("Negotiation read invoked on channel {}", channel); + try { + handleMessage((NetconfHelloMessage) msg); + } catch (final Exception e) { + LOG.debug("Unexpected error while handling negotiation message {} on channel {}", msg, channel, e); + negotiationFailed(e); + } + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + LOG.info("Unexpected error during negotiation on channel {}", channel, cause); + negotiationFailed(cause); + } + + protected abstract void handleMessage(NetconfHelloMessage msg) throws Exception; }