X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fnettyutil%2FAbstractNetconfSessionNegotiator.java;h=89c0be106837c21a907587dedca8dcb491b20a39;hb=fce09746d590a6465abb732ebe69582eeba8336c;hp=b3d35dc3c0d05bd8811505b4c450948052c9f0ff;hpb=01a3b9adce74d7c6a10646ebbf0a3f4998a76d41;p=netconf.git diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java index b3d35dc3c0..89c0be1068 100644 --- a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java +++ b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/AbstractNetconfSessionNegotiator.java @@ -5,28 +5,30 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.nettyutil; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import com.google.common.annotations.Beta; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.ssl.SslHandler; import io.netty.util.Timeout; import io.netty.util.Timer; -import io.netty.util.TimerTask; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; import java.util.concurrent.TimeUnit; +import org.checkerframework.checker.index.qual.NonNegative; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.netconf.api.NetconfDocumentedException; import org.opendaylight.netconf.api.NetconfMessage; import org.opendaylight.netconf.api.NetconfSessionListener; -import org.opendaylight.netconf.api.NetconfSessionPreferences; import org.opendaylight.netconf.api.messages.NetconfHelloMessage; import org.opendaylight.netconf.api.xml.XmlNetconfConstants; import org.opendaylight.netconf.nettyutil.handler.FramingMechanismHandlerFactory; @@ -35,131 +37,169 @@ import org.opendaylight.netconf.nettyutil.handler.NetconfMessageToXMLEncoder; import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToHelloMessageDecoder; import org.opendaylight.netconf.nettyutil.handler.NetconfXMLToMessageDecoder; import org.opendaylight.netconf.util.messages.FramingMechanism; -import org.opendaylight.protocol.framework.AbstractSessionNegotiator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.NodeList; -public abstract class AbstractNetconfSessionNegotiator
, L extends NetconfSessionListener
* Inbound hello message handler should be kept until negotiation is successful
* It caches any non-hello messages while negotiation is still in progress
*/
protected final void replaceHelloMessageInboundHandler(final S session) {
- ChannelHandler helloMessageHandler = replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, new NetconfXMLToMessageDecoder());
+ ChannelHandler helloMessageHandler = replaceChannelHandler(channel,
+ AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, new NetconfXMLToMessageDecoder());
- Preconditions.checkState(helloMessageHandler instanceof NetconfXMLToHelloMessageDecoder,
+ checkState(helloMessageHandler instanceof NetconfXMLToHelloMessageDecoder,
"Pipeline handlers misplaced on session: %s, pipeline: %s", session, channel.pipeline());
Iterable {
+ if (!f.isSuccess()) {
+ LOG.info("Failed to send message {} on channel {}", msg, channel, f.cause());
+ negotiationFailed(f.cause());
+ } else {
+ LOG.trace("Message {} sent to socket on channel {}", msg, channel);
+ }
+ });
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:illegalCatch")
+ public final void channelActive(final ChannelHandlerContext ctx) {
+ LOG.debug("Starting session negotiation on channel {}", channel);
+ try {
+ startNegotiation();
+ } catch (final Exception e) {
+ LOG.warn("Unexpected negotiation failure on channel {}", channel, e);
+ negotiationFailed(e);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:illegalCatch")
+ public final void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+ LOG.debug("Negotiation read invoked on channel {}", channel);
+ try {
+ handleMessage((NetconfHelloMessage) msg);
+ } catch (final Exception e) {
+ LOG.debug("Unexpected error while handling negotiation message {} on channel {}", msg, channel, e);
+ negotiationFailed(e);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ LOG.info("Unexpected error during negotiation on channel {}", channel, cause);
+ negotiationFailed(cause);
+ }
+
+ protected abstract void handleMessage(NetconfHelloMessage msg) throws Exception;
}
>
- extends AbstractSessionNegotiator,
+ L extends NetconfSessionListener>
+ extends ChannelInboundHandlerAdapter implements NetconfSessionNegotiator {
+ /**
+ * Possible states for Finite State Machine.
+ */
+ protected enum State {
+ IDLE, OPEN_WAIT, FAILED, ESTABLISHED
+ }
private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class);
-
- public static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler";
-
- protected final P sessionPreferences;
-
- private final L sessionListener;
- private Timeout timeout;
+ private static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler";
+ private static final String DEFAULT_MAXIMUM_CHUNK_SIZE_PROP = "org.opendaylight.netconf.default.maximum.chunk.size";
+ private static final int DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT = 16 * 1024 * 1024;
/**
- * Possible states for Finite State Machine
+ * Default upper bound on the size of an individual chunk. This value can be controlled through
+ * {@value #DEFAULT_MAXIMUM_CHUNK_SIZE_PROP} system property and defaults to
+ * {@value #DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT} bytes.
*/
- protected enum State {
- IDLE, OPEN_WAIT, FAILED, ESTABLISHED
+ @Beta
+ public static final @NonNegative int DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE;
+
+ static {
+ final int propValue = Integer.getInteger(DEFAULT_MAXIMUM_CHUNK_SIZE_PROP, DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT);
+ if (propValue <= 0) {
+ LOG.warn("Ignoring invalid {} value {}", DEFAULT_MAXIMUM_CHUNK_SIZE_PROP, propValue);
+ DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE = DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT;
+ } else {
+ DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE = propValue;
+ }
+ LOG.debug("Default maximum incoming NETCONF chunk size is {} bytes", DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE);
}
- private State state = State.IDLE;
+ private final @NonNull NetconfHelloMessage localHello;
+ protected final Channel channel;
+
+ private final @NonNegative int maximumIncomingChunkSize;
+ private final long connectionTimeoutMillis;
private final Promise promise;
+ private final L sessionListener;
private final Timer timer;
- private final long connectionTimeoutMillis;
- protected AbstractNetconfSessionNegotiator(final P sessionPreferences, final Promise promise, final Channel channel, final Timer timer,
- final L sessionListener, final long connectionTimeoutMillis) {
- super(promise, channel);
- this.sessionPreferences = sessionPreferences;
- this.promise = promise;
+ @GuardedBy("this")
+ private Timeout timeoutTask;
+ @GuardedBy("this")
+ private State state = State.IDLE;
+
+ protected AbstractNetconfSessionNegotiator(final NetconfHelloMessage hello, final Promise promise,
+ final Channel channel, final Timer timer, final L sessionListener,
+ final long connectionTimeoutMillis,
+ final @NonNegative int maximumIncomingChunkSize) {
+ this.localHello = requireNonNull(hello);
+ this.promise = requireNonNull(promise);
+ this.channel = requireNonNull(channel);
this.timer = timer;
this.sessionListener = sessionListener;
this.connectionTimeoutMillis = connectionTimeoutMillis;
+ this.maximumIncomingChunkSize = maximumIncomingChunkSize;
+ checkArgument(maximumIncomingChunkSize > 0, "Invalid maximum incoming chunk size %s", maximumIncomingChunkSize);
+ }
+
+ @Deprecated(since = "4.0.1", forRemoval = true)
+ protected AbstractNetconfSessionNegotiator(final NetconfHelloMessage hello, final Promise promise,
+ final Channel channel, final Timer timer,
+ final L sessionListener, final long connectionTimeoutMillis) {
+ this(hello, promise, channel, timer, sessionListener, connectionTimeoutMillis,
+ DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE);
+ }
+
+ protected final @NonNull NetconfHelloMessage localHello() {
+ return localHello;
}
- @Override
protected final void startNegotiation() {
- final Optional