2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.spi;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.Executor;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.TimeUnit;
22 import org.checkerframework.checker.lock.qual.GuardedBy;
23 import org.checkerframework.checker.lock.qual.Holding;
24 import org.eclipse.jdt.annotation.NonNull;
25 import org.opendaylight.mdsal.dom.api.DOMNotification;
26 import org.opendaylight.netconf.client.NetconfClientFactory;
27 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
28 import org.opendaylight.netconf.client.mdsal.LibraryModulesSchemas;
29 import org.opendaylight.netconf.client.mdsal.LibrarySchemaSourceProvider;
30 import org.opendaylight.netconf.client.mdsal.NetconfDevice.SchemaResourcesDTO;
31 import org.opendaylight.netconf.client.mdsal.NetconfDeviceBuilder;
32 import org.opendaylight.netconf.client.mdsal.NetconfDeviceCommunicator;
33 import org.opendaylight.netconf.client.mdsal.NetconfDeviceSchema;
34 import org.opendaylight.netconf.client.mdsal.SchemalessNetconfDevice;
35 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
36 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
37 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
38 import org.opendaylight.netconf.client.mdsal.api.RemoteDevice;
39 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceHandler;
40 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
41 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
42 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
43 import org.opendaylight.netconf.client.mdsal.spi.KeepaliveSalFacade;
44 import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
45 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.optional.rev221225.NetconfNodeAugmentedOptional;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
49 import org.opendaylight.yangtools.concepts.AbstractRegistration;
50 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
51 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
52 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 * All state associated with a NETCONF topology node. Each node handles its own reconnection.
59 public final class NetconfNodeHandler extends AbstractRegistration implements RemoteDeviceHandler {
60 private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeHandler.class);
62 private final @NonNull List<SchemaSourceRegistration<?>> yanglibRegistrations;
63 private final @NonNull NetconfClientFactory clientFactory;
64 private final @NonNull NetconfClientConfiguration clientConfig;
65 private final @NonNull NetconfDeviceCommunicator communicator;
66 private final @NonNull RemoteDeviceHandler delegate;
67 private final @NonNull ListeningScheduledExecutorService scheduledExecutor;
68 private final @NonNull RemoteDeviceId deviceId;
70 private final long maxAttempts;
71 private final int minSleep;
72 private final double sleepFactor;
75 private long attempts;
77 private long lastSleep;
79 private ListenableFuture<?> currentTask;
81 public NetconfNodeHandler(final NetconfClientFactory clientFactory,
82 final ScheduledExecutorService scheduledExecutor, final BaseNetconfSchemas baseSchemas,
83 final SchemaResourceManager schemaManager, final Executor processingExecutor,
84 final NetconfClientConfigurationBuilderFactory builderFactory,
85 final DeviceActionFactory deviceActionFactory, final RemoteDeviceHandler delegate,
86 final RemoteDeviceId deviceId, final NodeId nodeId, final NetconfNode node,
87 final NetconfNodeAugmentedOptional nodeOptional) {
88 this.clientFactory = requireNonNull(clientFactory);
89 // FIXME: do not wrap this executor
90 this.scheduledExecutor = MoreExecutors.listeningDecorator(scheduledExecutor);
91 this.delegate = requireNonNull(delegate);
92 this.deviceId = requireNonNull(deviceId);
94 maxAttempts = node.requireMaxConnectionAttempts().toJava();
95 minSleep = node.requireBetweenAttemptsTimeoutMillis().toJava();
96 sleepFactor = node.requireSleepFactor().doubleValue();
98 // Setup reconnection on empty context, if so configured
99 // FIXME: NETCONF-925: implement this
100 if (nodeOptional != null && nodeOptional.getIgnoreMissingSchemaSources().getAllowed()) {
101 LOG.warn("Ignoring missing schema sources is not currently implemented for {}", deviceId);
104 // The facade we are going it present to NetconfDevice
105 RemoteDeviceHandler salFacade;
106 final KeepaliveSalFacade keepAliveFacade;
107 final long keepaliveDelay = node.requireKeepaliveDelay().toJava();
108 if (keepaliveDelay > 0) {
109 LOG.info("Adding keepalive facade, for device {}", nodeId);
110 salFacade = keepAliveFacade = new KeepaliveSalFacade(deviceId, this, scheduledExecutor, keepaliveDelay,
111 node.requireDefaultRequestTimeoutMillis().toJava());
114 keepAliveFacade = null;
117 final RemoteDevice<NetconfDeviceCommunicator> device;
118 if (node.requireSchemaless()) {
119 device = new SchemalessNetconfDevice(baseSchemas, deviceId, salFacade);
120 yanglibRegistrations = List.of();
122 final var resources = schemaManager.getSchemaResources(node.getSchemaCacheDirectory(), nodeId.getValue());
123 device = new NetconfDeviceBuilder()
124 .setReconnectOnSchemasChange(node.requireReconnectOnChangedSchema())
125 .setSchemaResourcesDTO(resources)
126 .setGlobalProcessingExecutor(processingExecutor)
128 .setSalFacade(salFacade)
129 .setDeviceActionFactory(deviceActionFactory)
130 .setBaseSchemas(baseSchemas)
132 yanglibRegistrations = registerDeviceSchemaSources(deviceId, node, resources);
135 final int rpcMessageLimit = node.requireConcurrentRpcLimit().toJava();
136 if (rpcMessageLimit < 1) {
137 LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", deviceId);
140 communicator = new NetconfDeviceCommunicator(deviceId, device, rpcMessageLimit,
141 NetconfNodeUtils.extractUserCapabilities(node));
143 if (keepAliveFacade != null) {
144 keepAliveFacade.setListener(communicator);
147 clientConfig = builderFactory.createClientConfigurationBuilder(nodeId, node)
148 .withSessionListener(communicator)
152 public synchronized void connect() {
154 lastSleep = minSleep;
159 private void lockedConnect() {
161 final var clientFuture = clientFactory.createClient(clientConfig);
162 clientFuture.addListener(() -> connectComplete(clientFuture), MoreExecutors.directExecutor());
163 currentTask = clientFuture;
164 } catch (UnsupportedConfigurationException e) {
169 private void connectComplete(final ListenableFuture<?> future) {
170 // Locked manipulation of internal state
171 synchronized (this) {
172 // A quick sanity check
173 if (currentTask != future) {
174 LOG.warn("Ignoring connection completion, expected {} actual {}", future, currentTask);
178 // ListenableFuture provide no detail on error unless you attempt to get() the result
179 // then only the original exception is rethrown wrapped with ExecutionException
181 if (future.isCancelled() || future.isDone() && future.get() != null) {
182 // Success or cancellation, nothing else to do.
183 // In case of success the rest of the setup is driven by RemoteDeviceHandler callbacks
186 } catch (InterruptedException | ExecutionException e) {
187 LOG.debug("Connection attempt {} to {} failed", attempts, deviceId, e);
191 // We are invoking callbacks, do not hold locks
196 protected synchronized void removeRegistration() {
197 if (currentTask != null) {
198 currentTask.cancel(false);
202 communicator.close();
204 yanglibRegistrations.forEach(SchemaSourceRegistration::close);
208 public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
209 final NetconfSessionPreferences sessionPreferences, final RemoteDeviceServices services) {
210 synchronized (this) {
213 delegate.onDeviceConnected(deviceSchema, sessionPreferences, services);
217 public void onDeviceDisconnected() {
218 delegate.onDeviceDisconnected();
223 public void onDeviceFailed(final Throwable throwable) {
224 // We have not reported onDeviceConnected(), so from the view of delete we are still connecting
225 LOG.debug("Connection attempt failed", throwable);
230 public void onNotification(final DOMNotification domNotification) {
231 delegate.onNotification(domNotification);
234 private void reconnectOrFail() {
235 final var ex = scheduleReconnect();
237 delegate.onDeviceFailed(ex);
241 private synchronized Exception scheduleReconnect() {
246 final long delayMillis;
248 // We have exceeded the number of connection attempts
249 if (maxAttempts > 0 && attempts >= maxAttempts) {
250 LOG.info("Failed to connect {} after {} attempts, not attempting", deviceId, attempts);
251 return new ConnectGivenUpException("Given up connecting " + deviceId + " after " + attempts + " attempts");
254 // First connection attempt gets initialized to minimum sleep, each subsequent is exponentially backed off
257 final long nextSleep = (long) (lastSleep * sleepFactor);
258 // check for overflow
259 delayMillis = nextSleep >= 0 ? nextSleep : Long.MAX_VALUE;
261 delayMillis = minSleep;
265 lastSleep = delayMillis;
266 LOG.debug("Retrying {} connection attempt {} after {} milliseconds", deviceId, attempts, delayMillis);
268 // Schedule a task for the right time. We always go through the executor to eliminate the special case of
269 // immediate reconnect. While we could check and got to lockedConnect(), it makes for a rare special case.
270 // That special case makes for more code paths to test and introduces additional uncertainty whether
271 // the attempt was executed on this thread or not.
272 currentTask = scheduledExecutor.schedule(this::reconnect, delayMillis, TimeUnit.MILLISECONDS);
276 private synchronized void reconnect() {
283 private static List<SchemaSourceRegistration<?>> registerDeviceSchemaSources(final RemoteDeviceId remoteDeviceId,
284 final NetconfNode node, final SchemaResourcesDTO resources) {
285 final var yangLibrary = node.getYangLibrary();
286 if (yangLibrary != null) {
287 final Uri uri = yangLibrary.getYangLibraryUrl();
289 final var registrations = new ArrayList<SchemaSourceRegistration<?>>();
290 final var yangLibURL = uri.getValue();
291 final var schemaRegistry = resources.getSchemaRegistry();
293 // pre register yang library sources as fallback schemas to schema registry
294 final var yangLibUsername = yangLibrary.getUsername();
295 final var yangLigPassword = yangLibrary.getPassword();
296 final var schemas = yangLibUsername != null && yangLigPassword != null
297 ? LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword)
298 : LibraryModulesSchemas.create(yangLibURL);
300 for (var entry : schemas.getAvailableModels().entrySet()) {
301 registrations.add(schemaRegistry.registerSchemaSource(new LibrarySchemaSourceProvider(
302 remoteDeviceId, schemas.getAvailableModels()),
303 PotentialSchemaSource.create(entry.getKey(), YangTextSchemaSource.class,
304 PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
306 return List.copyOf(registrations);
314 synchronized long attempts() {