import static java.util.Objects.requireNonNull;
import com.google.common.annotations.VisibleForTesting;
-import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Future;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.util.Timeout;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.checkerframework.checker.lock.qual.Holding;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.dom.api.DOMNotification;
-import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.client.NetconfClientFactory;
+import org.opendaylight.netconf.client.NetconfClientSession;
import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
import org.opendaylight.netconf.client.mdsal.LibraryModulesSchemas;
import org.opendaylight.netconf.client.mdsal.LibrarySchemaSourceProvider;
import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
import org.opendaylight.netconf.client.mdsal.spi.KeepaliveSalFacade;
+import org.opendaylight.netconf.common.NetconfTimer;
+import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.optional.rev221225.NetconfNodeAugmentedOptional;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev231121.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
import org.opendaylight.yangtools.concepts.AbstractRegistration;
import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
* All state associated with a NETCONF topology node. Each node handles its own reconnection.
*/
public final class NetconfNodeHandler extends AbstractRegistration implements RemoteDeviceHandler {
+ private abstract static sealed class Task {
+
+ abstract void cancel();
+ }
+
+ private final class ConnectingTask extends Task implements FutureCallback<NetconfClientSession> {
+ private final ListenableFuture<NetconfClientSession> future;
+
+ ConnectingTask(final ListenableFuture<NetconfClientSession> future) {
+ this.future = requireNonNull(future);
+ }
+
+ @Override
+ void cancel() {
+ future.cancel(false);
+ }
+
+ @Override
+ public void onSuccess(final NetconfClientSession result) {
+ connectComplete(this);
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ if (cause instanceof CancellationException) {
+ connectComplete(this);
+ } else {
+ connectFailed(this, cause);
+ }
+ }
+ }
+
+ private static final class SleepingTask extends Task {
+ private final Timeout timeout;
+
+ SleepingTask(final Timeout timeout) {
+ this.timeout = requireNonNull(timeout);
+ }
+
+ @Override
+ void cancel() {
+ timeout.cancel();
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeHandler.class);
private final @NonNull List<SchemaSourceRegistration<?>> yanglibRegistrations;
- private final @NonNull NetconfClientDispatcher clientDispatcher;
+ private final @NonNull NetconfClientFactory clientFactory;
private final @NonNull NetconfClientConfiguration clientConfig;
private final @NonNull NetconfDeviceCommunicator communicator;
private final @NonNull RemoteDeviceHandler delegate;
- private final @NonNull EventExecutor eventExecutor;
+ private final @NonNull NetconfTimer timer;
private final @NonNull RemoteDeviceId deviceId;
+ private final long maxBackoff;
private final long maxAttempts;
- private final int minSleep;
- private final double sleepFactor;
+ private final int minBackoff;
+ private final double backoffMultiplier;
+ private final double jitter;
@GuardedBy("this")
private long attempts;
@GuardedBy("this")
- private long lastSleep;
+ private long lastBackoff;
@GuardedBy("this")
- private Future<?> currentTask;
+ private Task currentTask;
- public NetconfNodeHandler(final NetconfClientDispatcher clientDispatcher, final EventExecutor eventExecutor,
- final ScheduledExecutorService keepaliveExecutor, final BaseNetconfSchemas baseSchemas,
- final SchemaResourceManager schemaManager, final Executor processingExecutor,
+ public NetconfNodeHandler(final NetconfClientFactory clientFactory, final NetconfTimer timer,
+ final BaseNetconfSchemas baseSchemas, final SchemaResourceManager schemaManager,
+ final NetconfTopologySchemaAssembler schemaAssembler,
final NetconfClientConfigurationBuilderFactory builderFactory,
final DeviceActionFactory deviceActionFactory, final RemoteDeviceHandler delegate,
final RemoteDeviceId deviceId, final NodeId nodeId, final NetconfNode node,
final NetconfNodeAugmentedOptional nodeOptional) {
- this.clientDispatcher = requireNonNull(clientDispatcher);
- this.eventExecutor = requireNonNull(eventExecutor);
+ this.clientFactory = requireNonNull(clientFactory);
+ this.timer = requireNonNull(timer);
this.delegate = requireNonNull(delegate);
this.deviceId = requireNonNull(deviceId);
maxAttempts = node.requireMaxConnectionAttempts().toJava();
- minSleep = node.requireBetweenAttemptsTimeoutMillis().toJava();
- sleepFactor = node.requireSleepFactor().doubleValue();
+ minBackoff = node.requireMinBackoffMillis().toJava();
+ backoffMultiplier = node.requireBackoffMultiplier().doubleValue();
+ final long potentialMaxBackoff = node.requireMaxBackoffMillis().toJava();
+ maxBackoff = potentialMaxBackoff >= minBackoff ? potentialMaxBackoff : minBackoff;
+ jitter = node.getBackoffJitter().doubleValue();
// Setup reconnection on empty context, if so configured
// FIXME: NETCONF-925: implement this
final long keepaliveDelay = node.requireKeepaliveDelay().toJava();
if (keepaliveDelay > 0) {
LOG.info("Adding keepalive facade, for device {}", nodeId);
- salFacade = keepAliveFacade = new KeepaliveSalFacade(deviceId, this, keepaliveExecutor, keepaliveDelay,
+ salFacade = keepAliveFacade = new KeepaliveSalFacade(deviceId, this, timer, keepaliveDelay,
node.requireDefaultRequestTimeoutMillis().toJava());
} else {
salFacade = this;
device = new NetconfDeviceBuilder()
.setReconnectOnSchemasChange(node.requireReconnectOnChangedSchema())
.setSchemaResourcesDTO(resources)
- .setGlobalProcessingExecutor(processingExecutor)
+ .setGlobalProcessingExecutor(schemaAssembler.executor())
.setId(deviceId)
.setSalFacade(salFacade)
.setDeviceActionFactory(deviceActionFactory)
public synchronized void connect() {
attempts = 1;
- lastSleep = minSleep;
+ lastBackoff = minBackoff;
lockedConnect();
}
@Holding("this")
private void lockedConnect() {
- currentTask = clientDispatcher.createClient(clientConfig);
- currentTask.addListener(this::connectComplete);
+ final ListenableFuture<NetconfClientSession> connectFuture;
+ try {
+ connectFuture = clientFactory.createClient(clientConfig);
+ } catch (UnsupportedConfigurationException e) {
+ onDeviceFailed(e);
+ return;
+ }
+
+ final var nextTask = new ConnectingTask(connectFuture);
+ currentTask = nextTask;
+ Futures.addCallback(connectFuture, nextTask, MoreExecutors.directExecutor());
}
- private void connectComplete(final Future<?> future) {
- // Locked manipulation of internal state
- synchronized (this) {
- // A quick sanity check
- if (currentTask != future) {
- LOG.warn("Ignoring connection completion, expected {} actual {}", future, currentTask);
- return;
- }
+ private synchronized void connectComplete(final ConnectingTask task) {
+ // Just clear the task, if it matches our expectation
+ completeTask(task);
+ }
- currentTask = null;
- final var cause = future.cause();
- if (cause == null || cause instanceof CancellationException) {
- // Success or cancellation, nothing else to do.
- // In case of success the rest of the setup is driven by RemoteDeviceHandler callbacks
+ private void connectFailed(final ConnectingTask task, final Throwable cause) {
+ synchronized (this) {
+ if (completeTask(task)) {
+ // Mismatched future or the connection has been cancelled: nothing else to do
return;
}
-
LOG.debug("Connection attempt {} to {} failed", attempts, deviceId, cause);
}
reconnectOrFail();
}
+ @Holding("this")
+ private boolean completeTask(final ConnectingTask task) {
+ // A quick sanity check
+ if (task.equals(currentTask)) {
+ currentTask = null;
+ return false;
+ }
+ LOG.warn("Ignoring connection completion, expected {} actual {}", currentTask, task);
+ return true;
+ }
+
@Override
protected synchronized void removeRegistration() {
if (currentTask != null) {
- currentTask.cancel(false);
+ currentTask.cancel();
currentTask = null;
}
return null;
}
- final long delayMillis;
+ final long backoffMillis;
// We have exceeded the number of connection attempts
if (maxAttempts > 0 && attempts >= maxAttempts) {
return new ConnectGivenUpException("Given up connecting " + deviceId + " after " + attempts + " attempts");
}
- // First connection attempt gets initialized to minimum sleep, each subsequent is exponentially backed off
- // by sleepFactor.
+ // First connection attempt gets initialized to minimum backoff, each subsequent is exponentially backed off
+ // by backoffMultiplier (default 1.5) until reach max sleep and randomized by +/- jitter (default 0.1).
if (attempts != 0) {
- final long nextSleep = (long) (lastSleep * sleepFactor);
- // check for overflow
- delayMillis = nextSleep >= 0 ? nextSleep : Long.MAX_VALUE;
+ final var currentBackoff = Math.min(lastBackoff * backoffMultiplier, maxBackoff);
+ backoffMillis = (long) (currentBackoff * (Math.random() * (jitter * 2) + (1 - jitter)));
} else {
- delayMillis = minSleep;
+ backoffMillis = minBackoff;
}
attempts++;
- lastSleep = delayMillis;
- LOG.debug("Retrying {} connection attempt {} after {} milliseconds", deviceId, attempts, delayMillis);
+ lastBackoff = backoffMillis;
+ LOG.debug("Retrying {} connection attempt {} after {} milliseconds", deviceId, attempts, backoffMillis);
// Schedule a task for the right time. We always go through the executor to eliminate the special case of
// immediate reconnect. While we could check and got to lockedConnect(), it makes for a rare special case.
// That special case makes for more code paths to test and introduces additional uncertainty as to whether
- // the attempt was executed on on this thread or not.
- currentTask = eventExecutor.schedule(this::reconnect, delayMillis, TimeUnit.MILLISECONDS);
+ // the attempt was executed on this thread or not.
+ currentTask = new SleepingTask(timer.newTimeout(this::reconnect, backoffMillis, TimeUnit.MILLISECONDS));
return null;
}
- private synchronized void reconnect() {
+ private synchronized void reconnect(final Timeout timeout) {
currentTask = null;
if (notClosed()) {
lockedConnect();