import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
+import com.google.common.annotations.Beta;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.Promise;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import org.checkerframework.checker.index.qual.NonNegative;
import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.netconf.api.NetconfDocumentedException;
import org.opendaylight.netconf.api.NetconfMessage;
import org.opendaylight.netconf.api.NetconfSessionListener;
-import org.opendaylight.netconf.api.NetconfSessionPreferences;
import org.opendaylight.netconf.api.messages.NetconfHelloMessage;
import org.opendaylight.netconf.api.xml.XmlNetconfConstants;
import org.opendaylight.netconf.nettyutil.handler.FramingMechanismHandlerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.NodeList;
-public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionPreferences,
- S extends AbstractNetconfSession<S, L>, L extends NetconfSessionListener<S>>
+public abstract class AbstractNetconfSessionNegotiator<S extends AbstractNetconfSession<S, L>,
+ L extends NetconfSessionListener<S>>
extends ChannelInboundHandlerAdapter implements NetconfSessionNegotiator<S> {
/**
* Possible states for Finite State Machine.
private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class);
private static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler";
+ private static final String DEFAULT_MAXIMUM_CHUNK_SIZE_PROP = "org.opendaylight.netconf.default.maximum.chunk.size";
+ private static final int DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT = 16 * 1024 * 1024;
- protected final P sessionPreferences;
+ /**
+ * Default upper bound on the size of an individual chunk. This value can be controlled through
+ * {@value #DEFAULT_MAXIMUM_CHUNK_SIZE_PROP} system property and defaults to
+ * {@value #DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT} bytes.
+ */
+ @Beta
+ public static final @NonNegative int DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE;
+
+ static {
+ final int propValue = Integer.getInteger(DEFAULT_MAXIMUM_CHUNK_SIZE_PROP, DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT);
+ if (propValue <= 0) {
+ LOG.warn("Ignoring invalid {} value {}", DEFAULT_MAXIMUM_CHUNK_SIZE_PROP, propValue);
+ DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE = DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT;
+ } else {
+ DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE = propValue;
+ }
+ LOG.debug("Default maximum incoming NETCONF chunk size is {} bytes", DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE);
+ }
+
+ private final @NonNull NetconfHelloMessage localHello;
protected final Channel channel;
private final long connectionTimeoutMillis;
@GuardedBy("this")
private State state = State.IDLE;
- protected AbstractNetconfSessionNegotiator(final P sessionPreferences, final Promise<S> promise,
+ protected AbstractNetconfSessionNegotiator(final NetconfHelloMessage hello, final Promise<S> promise,
final Channel channel, final Timer timer,
final L sessionListener, final long connectionTimeoutMillis) {
- this.channel = requireNonNull(channel);
+ this.localHello = requireNonNull(hello);
this.promise = requireNonNull(promise);
- this.sessionPreferences = sessionPreferences;
+ this.channel = requireNonNull(channel);
this.timer = timer;
this.sessionListener = sessionListener;
this.connectionTimeoutMillis = connectionTimeoutMillis;
}
+ protected final @NonNull NetconfHelloMessage localHello() {
+ return localHello;
+ }
+
protected final void startNegotiation() {
if (ifNegotiatedAlready()) {
LOG.debug("Negotiation on channel {} already started", channel);
return Optional.ofNullable(channel.pipeline().get(SslHandler.class));
}
- public final P getSessionPreferences() {
- return sessionPreferences;
- }
-
private void start() {
- final NetconfHelloMessage helloMessage = this.sessionPreferences.getHelloMessage();
- LOG.debug("Session negotiation started with hello message {} on channel {}", helloMessage, channel);
+ LOG.debug("Session negotiation started with hello message {} on channel {}", localHello, channel);
channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
- sendMessage(helloMessage);
+ sendMessage(localHello);
replaceHelloMessageOutboundHandler();
changeState(State.OPEN_WAIT);
replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_FRAME_ENCODER,
FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK));
replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR,
- new NetconfChunkAggregator());
+ new NetconfChunkAggregator(DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE));
}
private boolean shouldUseChunkFraming(final Document doc) {
- return containsBase11Capability(doc)
- && containsBase11Capability(sessionPreferences.getHelloMessage().getDocument());
+ return containsBase11Capability(doc) && containsBase11Capability(localHello.getDocument());
}
/**