2 * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.spi;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import io.netty.util.concurrent.EventExecutor;
14 import io.netty.util.concurrent.Future;
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.Executor;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.TimeUnit;
21 import org.checkerframework.checker.lock.qual.GuardedBy;
22 import org.checkerframework.checker.lock.qual.Holding;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.opendaylight.mdsal.dom.api.DOMNotification;
25 import org.opendaylight.netconf.client.NetconfClientDispatcher;
26 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
27 import org.opendaylight.netconf.client.mdsal.LibraryModulesSchemas;
28 import org.opendaylight.netconf.client.mdsal.LibrarySchemaSourceProvider;
29 import org.opendaylight.netconf.client.mdsal.NetconfDevice.SchemaResourcesDTO;
30 import org.opendaylight.netconf.client.mdsal.NetconfDeviceBuilder;
31 import org.opendaylight.netconf.client.mdsal.NetconfDeviceCommunicator;
32 import org.opendaylight.netconf.client.mdsal.NetconfDeviceSchema;
33 import org.opendaylight.netconf.client.mdsal.SchemalessNetconfDevice;
34 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
35 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
36 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
37 import org.opendaylight.netconf.client.mdsal.api.RemoteDevice;
38 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceHandler;
39 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
40 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
41 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
42 import org.opendaylight.netconf.client.mdsal.spi.KeepaliveSalFacade;
43 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.optional.rev221225.NetconfNodeAugmentedOptional;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
47 import org.opendaylight.yangtools.concepts.AbstractRegistration;
48 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
49 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
50 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
55 * All state associated with a NETCONF topology node. Each node handles its own reconnection.
57 public final class NetconfNodeHandler extends AbstractRegistration implements RemoteDeviceHandler {
58 private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeHandler.class);
60 private final @NonNull List<SchemaSourceRegistration<?>> yanglibRegistrations;
61 private final @NonNull NetconfClientDispatcher clientDispatcher;
62 private final @NonNull NetconfClientConfiguration clientConfig;
63 private final @NonNull NetconfDeviceCommunicator communicator;
64 private final @NonNull RemoteDeviceHandler delegate;
65 private final @NonNull EventExecutor eventExecutor;
66 private final @NonNull RemoteDeviceId deviceId;
68 private final long maxAttempts;
69 private final int minSleep;
70 private final double sleepFactor;
73 private long attempts;
75 private long lastSleep;
77 private Future<?> currentTask;
79 public NetconfNodeHandler(final NetconfClientDispatcher clientDispatcher, final EventExecutor eventExecutor,
80 final ScheduledExecutorService keepaliveExecutor, final BaseNetconfSchemas baseSchemas,
81 final SchemaResourceManager schemaManager, final Executor processingExecutor,
82 final NetconfClientConfigurationBuilderFactory builderFactory,
83 final DeviceActionFactory deviceActionFactory, final RemoteDeviceHandler delegate,
84 final RemoteDeviceId deviceId, final NodeId nodeId, final NetconfNode node,
85 final NetconfNodeAugmentedOptional nodeOptional) {
86 this.clientDispatcher = requireNonNull(clientDispatcher);
87 this.eventExecutor = requireNonNull(eventExecutor);
88 this.delegate = requireNonNull(delegate);
89 this.deviceId = requireNonNull(deviceId);
91 maxAttempts = node.requireMaxConnectionAttempts().toJava();
92 minSleep = node.requireBetweenAttemptsTimeoutMillis().toJava();
93 sleepFactor = node.requireSleepFactor().doubleValue();
95 // Setup reconnection on empty context, if so configured
96 // FIXME: NETCONF-925: implement this
97 if (nodeOptional != null && nodeOptional.getIgnoreMissingSchemaSources().getAllowed()) {
98 LOG.warn("Ignoring missing schema sources is not currently implemented for {}", deviceId);
101 // The facade we are going it present to NetconfDevice
102 RemoteDeviceHandler salFacade;
103 final KeepaliveSalFacade keepAliveFacade;
104 final long keepaliveDelay = node.requireKeepaliveDelay().toJava();
105 if (keepaliveDelay > 0) {
106 LOG.info("Adding keepalive facade, for device {}", nodeId);
107 salFacade = keepAliveFacade = new KeepaliveSalFacade(deviceId, this, keepaliveExecutor, keepaliveDelay,
108 node.requireDefaultRequestTimeoutMillis().toJava());
111 keepAliveFacade = null;
114 final RemoteDevice<NetconfDeviceCommunicator> device;
115 if (node.requireSchemaless()) {
116 device = new SchemalessNetconfDevice(baseSchemas, deviceId, salFacade);
117 yanglibRegistrations = List.of();
119 final var resources = schemaManager.getSchemaResources(node.getSchemaCacheDirectory(), nodeId.getValue());
120 device = new NetconfDeviceBuilder()
121 .setReconnectOnSchemasChange(node.requireReconnectOnChangedSchema())
122 .setSchemaResourcesDTO(resources)
123 .setGlobalProcessingExecutor(processingExecutor)
125 .setSalFacade(salFacade)
126 .setDeviceActionFactory(deviceActionFactory)
127 .setBaseSchemas(baseSchemas)
129 yanglibRegistrations = registerDeviceSchemaSources(deviceId, node, resources);
132 final int rpcMessageLimit = node.requireConcurrentRpcLimit().toJava();
133 if (rpcMessageLimit < 1) {
134 LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", deviceId);
137 communicator = new NetconfDeviceCommunicator(deviceId, device, rpcMessageLimit,
138 NetconfNodeUtils.extractUserCapabilities(node));
140 if (keepAliveFacade != null) {
141 keepAliveFacade.setListener(communicator);
144 clientConfig = builderFactory.createClientConfigurationBuilder(nodeId, node)
145 .withSessionListener(communicator)
149 public synchronized void connect() {
151 lastSleep = minSleep;
156 private void lockedConnect() {
157 currentTask = clientDispatcher.createClient(clientConfig);
158 currentTask.addListener(this::connectComplete);
161 private void connectComplete(final Future<?> future) {
162 final Throwable cause;
164 // Locked manipulation of internal state
165 synchronized (this) {
166 // A quick sanity check
167 if (currentTask != future) {
168 LOG.warn("Ignoring connection completion, expected {} actual {}", future, currentTask);
173 cause = future.cause();
174 if (cause == null || cause instanceof CancellationException) {
175 // Success or cancellation, nothing else to do.
176 // In case of success the rest of the setup is driven by RemoteDeviceHandler callbacks
180 LOG.debug("Connection attempt {} to {} failed", attempts, deviceId, cause);
183 // We are invoking callbacks, do not hold locks
184 onDeviceFailed(cause);
188 protected synchronized void removeRegistration() {
189 if (currentTask != null) {
190 currentTask.cancel(false);
194 communicator.close();
196 yanglibRegistrations.forEach(SchemaSourceRegistration::close);
200 public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
201 final NetconfSessionPreferences sessionPreferences, final RemoteDeviceServices services) {
202 synchronized (this) {
205 delegate.onDeviceConnected(deviceSchema, sessionPreferences, services);
209 public void onDeviceDisconnected() {
210 delegate.onDeviceDisconnected();
215 public void onDeviceFailed(final Throwable throwable) {
216 LOG.debug("Connection attempt failed", throwable);
217 delegate.onDeviceFailed(throwable);
222 public void onNotification(final DOMNotification domNotification) {
223 delegate.onNotification(domNotification);
226 private synchronized void scheduleReconnect() {
231 final long delayMillis;
233 // We have exceeded the number of connection attempts
234 if (maxAttempts > 0 && attempts >= maxAttempts) {
235 LOG.info("Failed to connect {} after {} attempts, not attempting", deviceId, attempts);
239 // First connection attempt gets initialized to minimum sleep, each subsequent is exponentially backed off
242 final long nextSleep = (long) (lastSleep * sleepFactor);
243 // check for overflow
244 delayMillis = nextSleep >= 0 ? nextSleep : Long.MAX_VALUE;
246 delayMillis = minSleep;
250 lastSleep = delayMillis;
251 LOG.debug("Retrying {} connection attempt {} after {} milliseconds", deviceId, attempts, delayMillis);
253 // If we are not sleeping at all, return an already-succeeded future
254 if (delayMillis == 0) {
259 // Schedule a task for the right time. It will also clear the flag.
260 currentTask = eventExecutor.schedule(this::reconnect, delayMillis, TimeUnit.MILLISECONDS);
263 private synchronized void reconnect() {
270 private static List<SchemaSourceRegistration<?>> registerDeviceSchemaSources(final RemoteDeviceId remoteDeviceId,
271 final NetconfNode node, final SchemaResourcesDTO resources) {
272 final var yangLibrary = node.getYangLibrary();
273 if (yangLibrary != null) {
274 final Uri uri = yangLibrary.getYangLibraryUrl();
276 final var registrations = new ArrayList<SchemaSourceRegistration<?>>();
277 final var yangLibURL = uri.getValue();
278 final var schemaRegistry = resources.getSchemaRegistry();
280 // pre register yang library sources as fallback schemas to schema registry
281 final var yangLibUsername = yangLibrary.getUsername();
282 final var yangLigPassword = yangLibrary.getPassword();
283 final var schemas = yangLibUsername != null && yangLigPassword != null
284 ? LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword)
285 : LibraryModulesSchemas.create(yangLibURL);
287 for (var entry : schemas.getAvailableModels().entrySet()) {
288 registrations.add(schemaRegistry.registerSchemaSource(new LibrarySchemaSourceProvider(
289 remoteDeviceId, schemas.getAvailableModels()),
290 PotentialSchemaSource.create(entry.getKey(), YangTextSchemaSource.class,
291 PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
293 return List.copyOf(registrations);
301 synchronized long attempts() {