2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.spi;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import io.netty.util.Timeout;
18 import io.netty.util.Timer;
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.Executor;
23 import java.util.concurrent.TimeUnit;
24 import org.checkerframework.checker.lock.qual.GuardedBy;
25 import org.checkerframework.checker.lock.qual.Holding;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.opendaylight.mdsal.dom.api.DOMNotification;
28 import org.opendaylight.netconf.client.NetconfClientFactory;
29 import org.opendaylight.netconf.client.NetconfClientSession;
30 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
31 import org.opendaylight.netconf.client.mdsal.LibraryModulesSchemas;
32 import org.opendaylight.netconf.client.mdsal.LibrarySchemaSourceProvider;
33 import org.opendaylight.netconf.client.mdsal.NetconfDevice.SchemaResourcesDTO;
34 import org.opendaylight.netconf.client.mdsal.NetconfDeviceBuilder;
35 import org.opendaylight.netconf.client.mdsal.NetconfDeviceCommunicator;
36 import org.opendaylight.netconf.client.mdsal.NetconfDeviceSchema;
37 import org.opendaylight.netconf.client.mdsal.SchemalessNetconfDevice;
38 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
39 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
40 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
41 import org.opendaylight.netconf.client.mdsal.api.RemoteDevice;
42 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceHandler;
43 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
44 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
45 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
46 import org.opendaylight.netconf.client.mdsal.spi.KeepaliveSalFacade;
47 import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
48 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.optional.rev221225.NetconfNodeAugmentedOptional;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev231121.NetconfNode;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
52 import org.opendaylight.yangtools.concepts.AbstractRegistration;
53 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
54 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
55 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
60 * All state associated with a NETCONF topology node. Each node handles its own reconnection.
62 public final class NetconfNodeHandler extends AbstractRegistration implements RemoteDeviceHandler {
63 private abstract static sealed class Task {
65 abstract void cancel();
68 private final class ConnectingTask extends Task implements FutureCallback<NetconfClientSession> {
69 private final ListenableFuture<NetconfClientSession> future;
71 ConnectingTask(final ListenableFuture<NetconfClientSession> future) {
72 this.future = requireNonNull(future);
81 public void onSuccess(final NetconfClientSession result) {
82 connectComplete(this);
86 public void onFailure(final Throwable cause) {
87 if (cause instanceof CancellationException) {
88 connectComplete(this);
90 connectFailed(this, cause);
95 private static final class SleepingTask extends Task {
96 private final Timeout timeout;
98 SleepingTask(final Timeout timeout) {
99 this.timeout = requireNonNull(timeout);
108 private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeHandler.class);
110 private final @NonNull List<SchemaSourceRegistration<?>> yanglibRegistrations;
111 private final @NonNull NetconfClientFactory clientFactory;
112 private final @NonNull NetconfClientConfiguration clientConfig;
113 private final @NonNull NetconfDeviceCommunicator communicator;
114 private final @NonNull RemoteDeviceHandler delegate;
115 private final @NonNull Timer timer;
116 private final @NonNull RemoteDeviceId deviceId;
118 private final long maxAttempts;
119 private final int minSleep;
120 private final double sleepFactor;
123 private long attempts;
125 private long lastSleep;
127 private Task currentTask;
129 public NetconfNodeHandler(final NetconfClientFactory clientFactory, final Timer timer,
130 final BaseNetconfSchemas baseSchemas, final SchemaResourceManager schemaManager,
131 final Executor processingExecutor, final NetconfClientConfigurationBuilderFactory builderFactory,
132 final DeviceActionFactory deviceActionFactory, final RemoteDeviceHandler delegate,
133 final RemoteDeviceId deviceId, final NodeId nodeId, final NetconfNode node,
134 final NetconfNodeAugmentedOptional nodeOptional) {
135 this.clientFactory = requireNonNull(clientFactory);
136 this.timer = requireNonNull(timer);
137 this.delegate = requireNonNull(delegate);
138 this.deviceId = requireNonNull(deviceId);
140 maxAttempts = node.requireMaxConnectionAttempts().toJava();
141 minSleep = node.requireBetweenAttemptsTimeoutMillis().toJava();
142 sleepFactor = node.requireSleepFactor().doubleValue();
144 // Setup reconnection on empty context, if so configured
145 // FIXME: NETCONF-925: implement this
146 if (nodeOptional != null && nodeOptional.getIgnoreMissingSchemaSources().getAllowed()) {
147 LOG.warn("Ignoring missing schema sources is not currently implemented for {}", deviceId);
150 // The facade we are going it present to NetconfDevice
151 RemoteDeviceHandler salFacade;
152 final KeepaliveSalFacade keepAliveFacade;
153 final long keepaliveDelay = node.requireKeepaliveDelay().toJava();
154 if (keepaliveDelay > 0) {
155 LOG.info("Adding keepalive facade, for device {}", nodeId);
156 salFacade = keepAliveFacade = new KeepaliveSalFacade(deviceId, this, timer, keepaliveDelay,
157 node.requireDefaultRequestTimeoutMillis().toJava());
160 keepAliveFacade = null;
163 final RemoteDevice<NetconfDeviceCommunicator> device;
164 if (node.requireSchemaless()) {
165 device = new SchemalessNetconfDevice(baseSchemas, deviceId, salFacade);
166 yanglibRegistrations = List.of();
168 final var resources = schemaManager.getSchemaResources(node.getSchemaCacheDirectory(), nodeId.getValue());
169 device = new NetconfDeviceBuilder()
170 .setReconnectOnSchemasChange(node.requireReconnectOnChangedSchema())
171 .setSchemaResourcesDTO(resources)
172 .setGlobalProcessingExecutor(processingExecutor)
174 .setSalFacade(salFacade)
175 .setDeviceActionFactory(deviceActionFactory)
176 .setBaseSchemas(baseSchemas)
178 yanglibRegistrations = registerDeviceSchemaSources(deviceId, node, resources);
181 final int rpcMessageLimit = node.requireConcurrentRpcLimit().toJava();
182 if (rpcMessageLimit < 1) {
183 LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", deviceId);
186 communicator = new NetconfDeviceCommunicator(deviceId, device, rpcMessageLimit,
187 NetconfNodeUtils.extractUserCapabilities(node));
189 if (keepAliveFacade != null) {
190 keepAliveFacade.setListener(communicator);
193 clientConfig = builderFactory.createClientConfigurationBuilder(nodeId, node)
194 .withSessionListener(communicator)
198 public synchronized void connect() {
200 lastSleep = minSleep;
205 private void lockedConnect() {
206 final ListenableFuture<NetconfClientSession> connectFuture;
208 connectFuture = clientFactory.createClient(clientConfig);
209 } catch (UnsupportedConfigurationException e) {
214 final var nextTask = new ConnectingTask(connectFuture);
215 currentTask = nextTask;
216 Futures.addCallback(connectFuture, nextTask, MoreExecutors.directExecutor());
219 private synchronized void connectComplete(final ConnectingTask task) {
220 // Just clear the task, if it matches our expectation
224 private void connectFailed(final ConnectingTask task, final Throwable cause) {
225 synchronized (this) {
226 if (completeTask(task)) {
227 // Mismatched future or the connection has been cancelled: nothing else to do
230 LOG.debug("Connection attempt {} to {} failed", attempts, deviceId, cause);
233 // We are invoking callbacks, do not hold locks
238 private boolean completeTask(final ConnectingTask task) {
239 // A quick sanity check
240 if (task.equals(currentTask)) {
244 LOG.warn("Ignoring connection completion, expected {} actual {}", currentTask, task);
249 protected synchronized void removeRegistration() {
250 if (currentTask != null) {
251 currentTask.cancel();
255 communicator.close();
257 yanglibRegistrations.forEach(SchemaSourceRegistration::close);
261 public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
262 final NetconfSessionPreferences sessionPreferences, final RemoteDeviceServices services) {
263 synchronized (this) {
266 delegate.onDeviceConnected(deviceSchema, sessionPreferences, services);
270 public void onDeviceDisconnected() {
271 delegate.onDeviceDisconnected();
276 public void onDeviceFailed(final Throwable throwable) {
277 // We have not reported onDeviceConnected(), so from the view of delete we are still connecting
278 LOG.debug("Connection attempt failed", throwable);
283 public void onNotification(final DOMNotification domNotification) {
284 delegate.onNotification(domNotification);
287 private void reconnectOrFail() {
288 final var ex = scheduleReconnect();
290 delegate.onDeviceFailed(ex);
294 private synchronized Exception scheduleReconnect() {
299 final long delayMillis;
301 // We have exceeded the number of connection attempts
302 if (maxAttempts > 0 && attempts >= maxAttempts) {
303 LOG.info("Failed to connect {} after {} attempts, not attempting", deviceId, attempts);
304 return new ConnectGivenUpException("Given up connecting " + deviceId + " after " + attempts + " attempts");
307 // First connection attempt gets initialized to minimum sleep, each subsequent is exponentially backed off
310 final long nextSleep = (long) (lastSleep * sleepFactor);
311 // check for overflow
312 delayMillis = nextSleep >= 0 ? nextSleep : Long.MAX_VALUE;
314 delayMillis = minSleep;
318 lastSleep = delayMillis;
319 LOG.debug("Retrying {} connection attempt {} after {} milliseconds", deviceId, attempts, delayMillis);
321 // Schedule a task for the right time. We always go through the executor to eliminate the special case of
322 // immediate reconnect. While we could check and got to lockedConnect(), it makes for a rare special case.
323 // That special case makes for more code paths to test and introduces additional uncertainty as to whether
324 // the attempt was executed on this thread or not.
325 currentTask = new SleepingTask(timer.newTimeout(this::reconnect, delayMillis, TimeUnit.MILLISECONDS));
329 private synchronized void reconnect(final Timeout timeout) {
336 private static List<SchemaSourceRegistration<?>> registerDeviceSchemaSources(final RemoteDeviceId remoteDeviceId,
337 final NetconfNode node, final SchemaResourcesDTO resources) {
338 final var yangLibrary = node.getYangLibrary();
339 if (yangLibrary != null) {
340 final Uri uri = yangLibrary.getYangLibraryUrl();
342 final var registrations = new ArrayList<SchemaSourceRegistration<?>>();
343 final var yangLibURL = uri.getValue();
344 final var schemaRegistry = resources.getSchemaRegistry();
346 // pre register yang library sources as fallback schemas to schema registry
347 final var yangLibUsername = yangLibrary.getUsername();
348 final var yangLigPassword = yangLibrary.getPassword();
349 final var schemas = yangLibUsername != null && yangLigPassword != null
350 ? LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword)
351 : LibraryModulesSchemas.create(yangLibURL);
353 for (var entry : schemas.getAvailableModels().entrySet()) {
354 registrations.add(schemaRegistry.registerSchemaSource(new LibrarySchemaSourceProvider(
355 remoteDeviceId, schemas.getAvailableModels()),
356 PotentialSchemaSource.create(entry.getKey(), YangTextSchemaSource.class,
357 PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
359 return List.copyOf(registrations);
367 synchronized long attempts() {