2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.spi;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import io.netty.util.Timeout;
18 import java.util.ArrayList;
19 import java.util.List;
20 import java.util.concurrent.CancellationException;
21 import java.util.concurrent.TimeUnit;
22 import org.checkerframework.checker.lock.qual.GuardedBy;
23 import org.checkerframework.checker.lock.qual.Holding;
24 import org.eclipse.jdt.annotation.NonNull;
25 import org.opendaylight.mdsal.dom.api.DOMNotification;
26 import org.opendaylight.netconf.client.NetconfClientFactory;
27 import org.opendaylight.netconf.client.NetconfClientSession;
28 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
29 import org.opendaylight.netconf.client.mdsal.LibraryModulesSchemas;
30 import org.opendaylight.netconf.client.mdsal.LibrarySchemaSourceProvider;
31 import org.opendaylight.netconf.client.mdsal.NetconfDeviceBuilder;
32 import org.opendaylight.netconf.client.mdsal.NetconfDeviceCommunicator;
33 import org.opendaylight.netconf.client.mdsal.NetconfDeviceSchema;
34 import org.opendaylight.netconf.client.mdsal.SchemalessNetconfDevice;
35 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemaProvider;
36 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
37 import org.opendaylight.netconf.client.mdsal.api.DeviceNetconfSchemaProvider;
38 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
39 import org.opendaylight.netconf.client.mdsal.api.RemoteDevice;
40 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceHandler;
41 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
42 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
43 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
44 import org.opendaylight.netconf.client.mdsal.spi.KeepaliveSalFacade;
45 import org.opendaylight.netconf.common.NetconfTimer;
46 import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
47 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.optional.rev221225.NetconfNodeAugmentedOptional;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev231121.NetconfNode;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
51 import org.opendaylight.yangtools.concepts.AbstractRegistration;
52 import org.opendaylight.yangtools.concepts.Registration;
53 import org.opendaylight.yangtools.yang.model.api.source.YangTextSource;
54 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
59 * All state associated with a NETCONF topology node. Each node handles its own reconnection.
61 public final class NetconfNodeHandler extends AbstractRegistration implements RemoteDeviceHandler {
62 private abstract static sealed class Task {
64 abstract void cancel();
67 private final class ConnectingTask extends Task implements FutureCallback<NetconfClientSession> {
68 private final ListenableFuture<NetconfClientSession> future;
70 ConnectingTask(final ListenableFuture<NetconfClientSession> future) {
71 this.future = requireNonNull(future);
80 public void onSuccess(final NetconfClientSession result) {
81 connectComplete(this);
85 public void onFailure(final Throwable cause) {
86 if (cause instanceof CancellationException) {
87 connectComplete(this);
89 connectFailed(this, cause);
94 private static final class SleepingTask extends Task {
95 private final Timeout timeout;
97 SleepingTask(final Timeout timeout) {
98 this.timeout = requireNonNull(timeout);
107 private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeHandler.class);
109 private final @NonNull List<Registration> yanglibRegistrations;
110 private final @NonNull NetconfClientFactory clientFactory;
111 private final @NonNull NetconfClientConfiguration clientConfig;
112 private final @NonNull NetconfDeviceCommunicator communicator;
113 private final @NonNull RemoteDeviceHandler delegate;
114 private final @NonNull NetconfTimer timer;
115 private final @NonNull RemoteDeviceId deviceId;
117 private final long maxBackoff;
118 private final long maxAttempts;
119 private final int minBackoff;
120 private final double backoffMultiplier;
121 private final double jitter;
124 private long attempts;
126 private long lastBackoff;
128 private Task currentTask;
130 public NetconfNodeHandler(final NetconfClientFactory clientFactory, final NetconfTimer timer,
131 final BaseNetconfSchemaProvider baseSchemaProvider, final SchemaResourceManager schemaManager,
132 final NetconfTopologySchemaAssembler schemaAssembler,
133 final NetconfClientConfigurationBuilderFactory builderFactory,
134 final DeviceActionFactory deviceActionFactory, final RemoteDeviceHandler delegate,
135 final RemoteDeviceId deviceId, final NodeId nodeId, final NetconfNode node,
136 final NetconfNodeAugmentedOptional nodeOptional) {
137 this.clientFactory = requireNonNull(clientFactory);
138 this.timer = requireNonNull(timer);
139 this.delegate = requireNonNull(delegate);
140 this.deviceId = requireNonNull(deviceId);
142 maxAttempts = node.requireMaxConnectionAttempts().toJava();
143 minBackoff = node.requireMinBackoffMillis().toJava();
144 backoffMultiplier = node.requireBackoffMultiplier().doubleValue();
145 final long potentialMaxBackoff = node.requireMaxBackoffMillis().toJava();
146 maxBackoff = potentialMaxBackoff >= minBackoff ? potentialMaxBackoff : minBackoff;
147 jitter = node.getBackoffJitter().doubleValue();
149 // Setup reconnection on empty context, if so configured
150 // FIXME: NETCONF-925: implement this
151 if (nodeOptional != null && nodeOptional.getIgnoreMissingSchemaSources().getAllowed()) {
152 LOG.warn("Ignoring missing schema sources is not currently implemented for {}", deviceId);
155 // The facade we are going it present to NetconfDevice
156 RemoteDeviceHandler salFacade;
157 final KeepaliveSalFacade keepAliveFacade;
158 final long keepaliveDelay = node.requireKeepaliveDelay().toJava();
159 if (keepaliveDelay > 0) {
160 LOG.info("Adding keepalive facade, for device {}", nodeId);
161 salFacade = keepAliveFacade = new KeepaliveSalFacade(deviceId, this, timer, keepaliveDelay,
162 node.requireDefaultRequestTimeoutMillis().toJava());
165 keepAliveFacade = null;
168 final RemoteDevice<NetconfDeviceCommunicator> device;
169 if (node.requireSchemaless()) {
170 device = new SchemalessNetconfDevice(baseSchemaProvider, deviceId, salFacade);
171 yanglibRegistrations = List.of();
173 final var resources = schemaManager.getSchemaResources(node.getSchemaCacheDirectory(), nodeId.getValue());
174 device = new NetconfDeviceBuilder()
175 .setReconnectOnSchemasChange(node.requireReconnectOnChangedSchema())
176 .setBaseSchemaProvider(baseSchemaProvider)
177 .setDeviceSchemaProvider(resources)
178 .setProcessingExecutor(schemaAssembler.executor())
180 .setSalFacade(salFacade)
181 .setDeviceActionFactory(deviceActionFactory)
183 yanglibRegistrations = registerDeviceSchemaSources(deviceId, node, resources);
186 final int rpcMessageLimit = node.requireConcurrentRpcLimit().toJava();
187 if (rpcMessageLimit < 1) {
188 LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", deviceId);
191 communicator = new NetconfDeviceCommunicator(deviceId, device, rpcMessageLimit,
192 NetconfNodeUtils.extractUserCapabilities(node));
194 if (keepAliveFacade != null) {
195 keepAliveFacade.setListener(communicator);
198 clientConfig = builderFactory.createClientConfigurationBuilder(nodeId, node)
199 .withSessionListener(communicator)
203 public synchronized void connect() {
205 lastBackoff = minBackoff;
210 private void lockedConnect() {
211 final ListenableFuture<NetconfClientSession> connectFuture;
213 connectFuture = clientFactory.createClient(clientConfig);
214 } catch (UnsupportedConfigurationException e) {
219 final var nextTask = new ConnectingTask(connectFuture);
220 currentTask = nextTask;
221 Futures.addCallback(connectFuture, nextTask, MoreExecutors.directExecutor());
224 private synchronized void connectComplete(final ConnectingTask task) {
225 // Just clear the task, if it matches our expectation
229 private void connectFailed(final ConnectingTask task, final Throwable cause) {
230 synchronized (this) {
231 if (completeTask(task)) {
232 // Mismatched future or the connection has been cancelled: nothing else to do
235 LOG.debug("Connection attempt {} to {} failed", attempts, deviceId, cause);
238 // We are invoking callbacks, do not hold locks
243 private boolean completeTask(final ConnectingTask task) {
244 // A quick sanity check
245 if (task.equals(currentTask)) {
249 LOG.warn("Ignoring connection completion, expected {} actual {}", currentTask, task);
254 protected synchronized void removeRegistration() {
255 if (currentTask != null) {
256 currentTask.cancel();
260 communicator.close();
262 yanglibRegistrations.forEach(Registration::close);
266 public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
267 final NetconfSessionPreferences sessionPreferences, final RemoteDeviceServices services) {
268 synchronized (this) {
271 delegate.onDeviceConnected(deviceSchema, sessionPreferences, services);
275 public void onDeviceDisconnected() {
276 delegate.onDeviceDisconnected();
281 public void onDeviceFailed(final Throwable throwable) {
282 // We have not reported onDeviceConnected(), so from the view of delete we are still connecting
283 LOG.debug("Connection attempt failed", throwable);
288 public void onNotification(final DOMNotification domNotification) {
289 delegate.onNotification(domNotification);
292 private void reconnectOrFail() {
293 final var ex = scheduleReconnect();
295 delegate.onDeviceFailed(ex);
299 private synchronized Exception scheduleReconnect() {
304 final long backoffMillis;
306 // We have exceeded the number of connection attempts
307 if (maxAttempts > 0 && attempts >= maxAttempts) {
308 LOG.info("Failed to connect {} after {} attempts, not attempting", deviceId, attempts);
309 return new ConnectGivenUpException("Given up connecting " + deviceId + " after " + attempts + " attempts");
312 // First connection attempt gets initialized to minimum backoff, each subsequent is exponentially backed off
313 // by backoffMultiplier (default 1.5) until reach max sleep and randomized by +/- jitter (default 0.1).
315 final var currentBackoff = Math.min(lastBackoff * backoffMultiplier, maxBackoff);
316 backoffMillis = (long) (currentBackoff * (Math.random() * (jitter * 2) + (1 - jitter)));
318 backoffMillis = minBackoff;
322 lastBackoff = backoffMillis;
323 LOG.debug("Retrying {} connection attempt {} after {} milliseconds", deviceId, attempts, backoffMillis);
325 // Schedule a task for the right time. We always go through the executor to eliminate the special case of
326 // immediate reconnect. While we could check and got to lockedConnect(), it makes for a rare special case.
327 // That special case makes for more code paths to test and introduces additional uncertainty as to whether
328 // the attempt was executed on this thread or not.
329 currentTask = new SleepingTask(timer.newTimeout(this::reconnect, backoffMillis, TimeUnit.MILLISECONDS));
333 private synchronized void reconnect(final Timeout timeout) {
340 private static List<Registration> registerDeviceSchemaSources(final RemoteDeviceId remoteDeviceId,
341 final NetconfNode node, final DeviceNetconfSchemaProvider resources) {
342 final var yangLibrary = node.getYangLibrary();
343 if (yangLibrary != null) {
344 final Uri uri = yangLibrary.getYangLibraryUrl();
346 final var registrations = new ArrayList<Registration>();
347 final var yangLibURL = uri.getValue();
349 // pre register yang library sources as fallback schemas to schema registry
350 final var yangLibUsername = yangLibrary.getUsername();
351 final var yangLigPassword = yangLibrary.getPassword();
352 final var schemas = yangLibUsername != null && yangLigPassword != null
353 ? LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword)
354 : LibraryModulesSchemas.create(yangLibURL);
356 final var registry = resources.registry();
357 for (var entry : schemas.getAvailableModels().entrySet()) {
358 registrations.add(registry.registerSchemaSource(new LibrarySchemaSourceProvider(
359 remoteDeviceId, schemas.getAvailableModels()),
360 PotentialSchemaSource.create(entry.getKey(), YangTextSource.class,
361 PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
363 return List.copyOf(registrations);
371 synchronized long attempts() {