*/
package org.opendaylight.netconf.nettyutil;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
+import com.google.common.annotations.Beta;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.concurrent.Promise;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import org.checkerframework.checker.index.qual.NonNegative;
import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.netconf.api.NetconfDocumentedException;
import org.opendaylight.netconf.api.NetconfMessage;
import org.opendaylight.netconf.api.NetconfSessionListener;
private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class);
private static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler";
+ private static final String DEFAULT_MAXIMUM_CHUNK_SIZE_PROP = "org.opendaylight.netconf.default.maximum.chunk.size";
+ private static final int DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT = 16 * 1024 * 1024;
+
+ /**
+ * Default upper bound on the size of an individual chunk. This value can be controlled through
+ * {@value #DEFAULT_MAXIMUM_CHUNK_SIZE_PROP} system property and defaults to
+ * {@value #DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT} bytes.
+ */
+ @Beta
+ public static final @NonNegative int DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE;
+
+ static {
+ final int propValue = Integer.getInteger(DEFAULT_MAXIMUM_CHUNK_SIZE_PROP, DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT);
+ if (propValue <= 0) {
+ LOG.warn("Ignoring invalid {} value {}", DEFAULT_MAXIMUM_CHUNK_SIZE_PROP, propValue);
+ DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE = DEFAULT_MAXIMUM_CHUNK_SIZE_DEFAULT;
+ } else {
+ DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE = propValue;
+ }
+ LOG.debug("Default maximum incoming NETCONF chunk size is {} bytes", DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE);
+ }
private final @NonNull NetconfHelloMessage localHello;
protected final Channel channel;
+ private final @NonNegative int maximumIncomingChunkSize;
private final long connectionTimeoutMillis;
private final Promise<S> promise;
private final L sessionListener;
private final Timer timer;
+ @GuardedBy("this")
private Timeout timeoutTask;
-
@GuardedBy("this")
private State state = State.IDLE;
protected AbstractNetconfSessionNegotiator(final NetconfHelloMessage hello, final Promise<S> promise,
- final Channel channel, final Timer timer,
- final L sessionListener, final long connectionTimeoutMillis) {
+ final Channel channel, final Timer timer, final L sessionListener,
+ final long connectionTimeoutMillis,
+ final @NonNegative int maximumIncomingChunkSize) {
this.localHello = requireNonNull(hello);
this.promise = requireNonNull(promise);
this.channel = requireNonNull(channel);
this.timer = timer;
this.sessionListener = sessionListener;
this.connectionTimeoutMillis = connectionTimeoutMillis;
+ this.maximumIncomingChunkSize = maximumIncomingChunkSize;
+ checkArgument(maximumIncomingChunkSize > 0, "Invalid maximum incoming chunk size %s", maximumIncomingChunkSize);
+ }
+
+ @Deprecated(since = "4.0.1", forRemoval = true)
+ protected AbstractNetconfSessionNegotiator(final NetconfHelloMessage hello, final Promise<S> promise,
+ final Channel channel, final Timer timer,
+ final L sessionListener, final long connectionTimeoutMillis) {
+ this(hello, promise, channel, timer, sessionListener, connectionTimeoutMillis,
+ DEFAULT_MAXIMUM_INCOMING_CHUNK_SIZE);
}
protected final @NonNull NetconfHelloMessage localHello() {
if (ifNegotiatedAlready()) {
LOG.debug("Negotiation on channel {} already started", channel);
} else {
- final Optional<SslHandler> sslHandler = getSslHandler(channel);
- if (sslHandler.isPresent()) {
- sslHandler.get().handshakeFuture().addListener(future -> {
+ final var sslHandler = getSslHandler(channel);
+ if (sslHandler != null) {
+ sslHandler.handshakeFuture().addListener(future -> {
checkState(future.isSuccess(), "Ssl handshake was not successful");
LOG.debug("Ssl handshake complete");
start();
return this.state != State.IDLE;
}
- private static Optional<SslHandler> getSslHandler(final Channel channel) {
- return Optional.ofNullable(channel.pipeline().get(SslHandler.class));
+ private static @Nullable SslHandler getSslHandler(final Channel channel) {
+ return channel.pipeline().get(SslHandler.class);
}
private void start() {
sendMessage(localHello);
replaceHelloMessageOutboundHandler();
- changeState(State.OPEN_WAIT);
- timeoutTask = this.timer.newTimeout(this::timeoutExpired, connectionTimeoutMillis, TimeUnit.MILLISECONDS);
+ synchronized (this) {
+ lockedChangeState(State.OPEN_WAIT);
+
+ // Service the timeout on channel's eventloop, so that we do not get state transition problems
+ timeoutTask = timer.newTimeout(unused -> channel.eventLoop().execute(this::timeoutExpired),
+ connectionTimeoutMillis, TimeUnit.MILLISECONDS);
+ }
}
- private synchronized void timeoutExpired(final Timeout timeout) {
+ private synchronized void timeoutExpired() {
+ if (timeoutTask == null) {
+ // cancelTimeout() between expiry and execution on the loop
+ return;
+ }
+ timeoutTask = null;
+
if (state != State.ESTABLISHED) {
- LOG.debug("Connection timeout after {}, session backed by channel {} is in state {}", timeout, channel,
- state);
+ LOG.debug("Connection timeout after {}ms, session backed by channel {} is in state {}",
+ connectionTimeoutMillis, channel, state);
// Do not fail negotiation if promise is done or canceled
// It would result in setting result of the promise second time and that throws exception
}
}
- private void cancelTimeout() {
- if (timeoutTask != null) {
- timeoutTask.cancel();
+ private synchronized void cancelTimeout() {
+ if (timeoutTask != null && !timeoutTask.cancel()) {
+ // Late-coming cancel: make sure
+ timeoutTask = null;
}
}
replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_FRAME_ENCODER,
FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK));
replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR,
- new NetconfChunkAggregator());
+ new NetconfChunkAggregator(maximumIncomingChunkSize));
}
private boolean shouldUseChunkFraming(final Document doc) {
}
private synchronized void changeState(final State newState) {
+ lockedChangeState(newState);
+ }
+
+ @Holding("this")
+ private void lockedChangeState(final State newState) {
LOG.debug("Changing state from : {} to : {} for channel: {}", state, newState, channel);
checkState(isStateChangePermitted(state, newState),
"Cannot change state from %s to %s for channel %s", state, newState, channel);