Introduce NetconfTimer
[netconf.git] / apps / netconf-topology / src / main / java / org / opendaylight / netconf / topology / spi / NetconfNodeHandler.java
index 47f980c66039ce8fa1c47e8e1077254aab9e1039..9084c4a158ddd41058281059b13ae3fda9c32363 100644 (file)
@@ -10,19 +10,21 @@ package org.opendaylight.netconf.topology.spi;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.annotations.VisibleForTesting;
-import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Future;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.util.Timeout;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CancellationException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.checkerframework.checker.lock.qual.Holding;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.dom.api.DOMNotification;
-import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.client.NetconfClientFactory;
+import org.opendaylight.netconf.client.NetconfClientSession;
 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
 import org.opendaylight.netconf.client.mdsal.LibraryModulesSchemas;
 import org.opendaylight.netconf.client.mdsal.LibrarySchemaSourceProvider;
@@ -40,9 +42,11 @@ import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
 import org.opendaylight.netconf.client.mdsal.spi.KeepaliveSalFacade;
+import org.opendaylight.netconf.common.NetconfTimer;
+import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.optional.rev221225.NetconfNodeAugmentedOptional;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev231121.NetconfNode;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yangtools.concepts.AbstractRegistration;
 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
@@ -55,42 +59,92 @@ import org.slf4j.LoggerFactory;
  * All state associated with a NETCONF topology node. Each node handles its own reconnection.
  */
 public final class NetconfNodeHandler extends AbstractRegistration implements RemoteDeviceHandler {
+    private abstract static sealed class Task {
+
+        abstract void cancel();
+    }
+
+    private final class ConnectingTask extends Task implements FutureCallback<NetconfClientSession> {
+        private final ListenableFuture<NetconfClientSession> future;
+
+        ConnectingTask(final ListenableFuture<NetconfClientSession> future) {
+            this.future = requireNonNull(future);
+        }
+
+        @Override
+        void cancel() {
+            future.cancel(false);
+        }
+
+        @Override
+        public void onSuccess(final NetconfClientSession result) {
+            connectComplete(this);
+        }
+
+        @Override
+        public void onFailure(final Throwable cause) {
+            if (cause instanceof CancellationException) {
+                connectComplete(this);
+            } else {
+                connectFailed(this, cause);
+            }
+        }
+    }
+
+    private static final class SleepingTask extends Task {
+        private final Timeout timeout;
+
+        SleepingTask(final Timeout timeout) {
+            this.timeout = requireNonNull(timeout);
+        }
+
+        @Override
+        void cancel() {
+            timeout.cancel();
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeHandler.class);
 
     private final @NonNull List<SchemaSourceRegistration<?>> yanglibRegistrations;
-    private final @NonNull NetconfClientDispatcher clientDispatcher;
+    private final @NonNull NetconfClientFactory clientFactory;
     private final @NonNull NetconfClientConfiguration clientConfig;
     private final @NonNull NetconfDeviceCommunicator communicator;
     private final @NonNull RemoteDeviceHandler delegate;
-    private final @NonNull EventExecutor eventExecutor;
+    private final @NonNull NetconfTimer timer;
     private final @NonNull RemoteDeviceId deviceId;
 
+    private final long maxBackoff;
     private final long maxAttempts;
-    private final int minSleep;
-    private final double sleepFactor;
+    private final int minBackoff;
+    private final double backoffMultiplier;
+    private final double jitter;
 
     @GuardedBy("this")
     private long attempts;
     @GuardedBy("this")
-    private long lastSleep;
+    private long lastBackoff;
     @GuardedBy("this")
-    private Future<?> currentTask;
+    private Task currentTask;
 
-    public NetconfNodeHandler(final NetconfClientDispatcher clientDispatcher, final EventExecutor eventExecutor,
-            final ScheduledExecutorService keepaliveExecutor, final BaseNetconfSchemas baseSchemas,
-            final SchemaResourceManager schemaManager, final Executor processingExecutor,
+    public NetconfNodeHandler(final NetconfClientFactory clientFactory, final NetconfTimer timer,
+            final BaseNetconfSchemas baseSchemas, final SchemaResourceManager schemaManager,
+            final NetconfTopologySchemaAssembler schemaAssembler,
             final NetconfClientConfigurationBuilderFactory builderFactory,
             final DeviceActionFactory deviceActionFactory, final RemoteDeviceHandler delegate,
             final RemoteDeviceId deviceId, final NodeId nodeId, final NetconfNode node,
             final NetconfNodeAugmentedOptional nodeOptional) {
-        this.clientDispatcher = requireNonNull(clientDispatcher);
-        this.eventExecutor = requireNonNull(eventExecutor);
+        this.clientFactory = requireNonNull(clientFactory);
+        this.timer = requireNonNull(timer);
         this.delegate = requireNonNull(delegate);
         this.deviceId = requireNonNull(deviceId);
 
         maxAttempts = node.requireMaxConnectionAttempts().toJava();
-        minSleep = node.requireBetweenAttemptsTimeoutMillis().toJava();
-        sleepFactor = node.requireSleepFactor().doubleValue();
+        minBackoff = node.requireMinBackoffMillis().toJava();
+        backoffMultiplier = node.requireBackoffMultiplier().doubleValue();
+        final long potentialMaxBackoff = node.requireMaxBackoffMillis().toJava();
+        maxBackoff = potentialMaxBackoff >= minBackoff ? potentialMaxBackoff : minBackoff;
+        jitter = node.getBackoffJitter().doubleValue();
 
         // Setup reconnection on empty context, if so configured
         // FIXME: NETCONF-925: implement this
@@ -104,7 +158,7 @@ public final class NetconfNodeHandler extends AbstractRegistration implements Re
         final long keepaliveDelay = node.requireKeepaliveDelay().toJava();
         if (keepaliveDelay > 0) {
             LOG.info("Adding keepalive facade, for device {}", nodeId);
-            salFacade = keepAliveFacade = new KeepaliveSalFacade(deviceId, this, keepaliveExecutor, keepaliveDelay,
+            salFacade = keepAliveFacade = new KeepaliveSalFacade(deviceId, this, timer, keepaliveDelay,
                 node.requireDefaultRequestTimeoutMillis().toJava());
         } else {
             salFacade = this;
@@ -120,7 +174,7 @@ public final class NetconfNodeHandler extends AbstractRegistration implements Re
             device = new NetconfDeviceBuilder()
                 .setReconnectOnSchemasChange(node.requireReconnectOnChangedSchema())
                 .setSchemaResourcesDTO(resources)
-                .setGlobalProcessingExecutor(processingExecutor)
+                .setGlobalProcessingExecutor(schemaAssembler.executor())
                 .setId(deviceId)
                 .setSalFacade(salFacade)
                 .setDeviceActionFactory(deviceActionFactory)
@@ -148,33 +202,36 @@ public final class NetconfNodeHandler extends AbstractRegistration implements Re
 
     public synchronized void connect() {
         attempts = 1;
-        lastSleep = minSleep;
+        lastBackoff = minBackoff;
         lockedConnect();
     }
 
     @Holding("this")
     private void lockedConnect() {
-        currentTask = clientDispatcher.createClient(clientConfig);
-        currentTask.addListener(this::connectComplete);
+        final ListenableFuture<NetconfClientSession> connectFuture;
+        try {
+            connectFuture = clientFactory.createClient(clientConfig);
+        } catch (UnsupportedConfigurationException e) {
+            onDeviceFailed(e);
+            return;
+        }
+
+        final var nextTask = new ConnectingTask(connectFuture);
+        currentTask = nextTask;
+        Futures.addCallback(connectFuture, nextTask, MoreExecutors.directExecutor());
     }
 
-    private void connectComplete(final Future<?> future) {
-        // Locked manipulation of internal state
-        synchronized (this) {
-            // A quick sanity check
-            if (currentTask != future) {
-                LOG.warn("Ignoring connection completion, expected {} actual {}", future, currentTask);
-                return;
-            }
+    private synchronized void connectComplete(final ConnectingTask task) {
+        // Just clear the task, if it matches our expectation
+        completeTask(task);
+    }
 
-            currentTask = null;
-            final var cause = future.cause();
-            if (cause == null || cause instanceof CancellationException) {
-                // Success or cancellation, nothing else to do.
-                // In case of success the rest of the setup is driven by RemoteDeviceHandler callbacks
+    private void connectFailed(final ConnectingTask task, final Throwable cause) {
+        synchronized (this) {
+            if (completeTask(task)) {
+                // Mismatched future or the connection has been cancelled: nothing else to do
                 return;
             }
-
             LOG.debug("Connection attempt {} to {} failed", attempts, deviceId, cause);
         }
 
@@ -182,10 +239,21 @@ public final class NetconfNodeHandler extends AbstractRegistration implements Re
         reconnectOrFail();
     }
 
+    @Holding("this")
+    private boolean completeTask(final ConnectingTask task) {
+        // A quick sanity check
+        if (task.equals(currentTask)) {
+            currentTask = null;
+            return false;
+        }
+        LOG.warn("Ignoring connection completion, expected {} actual {}", currentTask, task);
+        return true;
+    }
+
     @Override
     protected synchronized void removeRegistration() {
         if (currentTask != null) {
-            currentTask.cancel(false);
+            currentTask.cancel();
             currentTask = null;
         }
 
@@ -233,7 +301,7 @@ public final class NetconfNodeHandler extends AbstractRegistration implements Re
             return null;
         }
 
-        final long delayMillis;
+        final long backoffMillis;
 
         // We have exceeded the number of connection attempts
         if (maxAttempts > 0 && attempts >= maxAttempts) {
@@ -241,29 +309,28 @@ public final class NetconfNodeHandler extends AbstractRegistration implements Re
             return new ConnectGivenUpException("Given up connecting " + deviceId + " after " + attempts + " attempts");
         }
 
-        // First connection attempt gets initialized to minimum sleep, each subsequent is exponentially backed off
-        // by sleepFactor.
+        // First connection attempt gets initialized to minimum backoff, each subsequent is exponentially backed off
+        // by backoffMultiplier (default 1.5) until reach max sleep and randomized by +/- jitter (default 0.1).
         if (attempts != 0) {
-            final long nextSleep = (long) (lastSleep * sleepFactor);
-            // check for overflow
-            delayMillis = nextSleep >= 0 ? nextSleep : Long.MAX_VALUE;
+            final var currentBackoff = Math.min(lastBackoff * backoffMultiplier, maxBackoff);
+            backoffMillis = (long) (currentBackoff * (Math.random() * (jitter * 2) + (1 - jitter)));
         } else {
-            delayMillis = minSleep;
+            backoffMillis = minBackoff;
         }
 
         attempts++;
-        lastSleep = delayMillis;
-        LOG.debug("Retrying {} connection attempt {} after {} milliseconds", deviceId, attempts, delayMillis);
+        lastBackoff = backoffMillis;
+        LOG.debug("Retrying {} connection attempt {} after {} milliseconds", deviceId, attempts, backoffMillis);
 
         // Schedule a task for the right time. We always go through the executor to eliminate the special case of
         // immediate reconnect. While we could check and got to lockedConnect(), it makes for a rare special case.
         // That special case makes for more code paths to test and introduces additional uncertainty as to whether
-        // the attempt was executed on on this thread or not.
-        currentTask = eventExecutor.schedule(this::reconnect, delayMillis, TimeUnit.MILLISECONDS);
+        // the attempt was executed on this thread or not.
+        currentTask = new SleepingTask(timer.newTimeout(this::reconnect, backoffMillis, TimeUnit.MILLISECONDS));
         return null;
     }
 
-    private synchronized void reconnect() {
+    private synchronized void reconnect(final Timeout timeout) {
         currentTask = null;
         if (notClosed()) {
             lockedConnect();