Bump upstreams
[netconf.git] / apps / netconf-topology / src / main / java / org / opendaylight / netconf / topology / spi / NetconfNodeHandler.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.spi;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import io.netty.util.Timeout;
18 import java.util.ArrayList;
19 import java.util.List;
20 import java.util.concurrent.CancellationException;
21 import java.util.concurrent.TimeUnit;
22 import org.checkerframework.checker.lock.qual.GuardedBy;
23 import org.checkerframework.checker.lock.qual.Holding;
24 import org.eclipse.jdt.annotation.NonNull;
25 import org.opendaylight.mdsal.dom.api.DOMNotification;
26 import org.opendaylight.netconf.client.NetconfClientFactory;
27 import org.opendaylight.netconf.client.NetconfClientSession;
28 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
29 import org.opendaylight.netconf.client.mdsal.LibraryModulesSchemas;
30 import org.opendaylight.netconf.client.mdsal.LibrarySchemaSourceProvider;
31 import org.opendaylight.netconf.client.mdsal.NetconfDevice.SchemaResourcesDTO;
32 import org.opendaylight.netconf.client.mdsal.NetconfDeviceBuilder;
33 import org.opendaylight.netconf.client.mdsal.NetconfDeviceCommunicator;
34 import org.opendaylight.netconf.client.mdsal.NetconfDeviceSchema;
35 import org.opendaylight.netconf.client.mdsal.SchemalessNetconfDevice;
36 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
37 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
38 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
39 import org.opendaylight.netconf.client.mdsal.api.RemoteDevice;
40 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceHandler;
41 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
42 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
43 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
44 import org.opendaylight.netconf.client.mdsal.spi.KeepaliveSalFacade;
45 import org.opendaylight.netconf.common.NetconfTimer;
46 import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
47 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.optional.rev221225.NetconfNodeAugmentedOptional;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev231121.NetconfNode;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
51 import org.opendaylight.yangtools.concepts.AbstractRegistration;
52 import org.opendaylight.yangtools.concepts.Registration;
53 import org.opendaylight.yangtools.yang.model.api.source.YangTextSource;
54 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 /**
59  * All state associated with a NETCONF topology node. Each node handles its own reconnection.
60  */
61 public final class NetconfNodeHandler extends AbstractRegistration implements RemoteDeviceHandler {
62     private abstract static sealed class Task {
63
64         abstract void cancel();
65     }
66
67     private final class ConnectingTask extends Task implements FutureCallback<NetconfClientSession> {
68         private final ListenableFuture<NetconfClientSession> future;
69
70         ConnectingTask(final ListenableFuture<NetconfClientSession> future) {
71             this.future = requireNonNull(future);
72         }
73
74         @Override
75         void cancel() {
76             future.cancel(false);
77         }
78
79         @Override
80         public void onSuccess(final NetconfClientSession result) {
81             connectComplete(this);
82         }
83
84         @Override
85         public void onFailure(final Throwable cause) {
86             if (cause instanceof CancellationException) {
87                 connectComplete(this);
88             } else {
89                 connectFailed(this, cause);
90             }
91         }
92     }
93
94     private static final class SleepingTask extends Task {
95         private final Timeout timeout;
96
97         SleepingTask(final Timeout timeout) {
98             this.timeout = requireNonNull(timeout);
99         }
100
101         @Override
102         void cancel() {
103             timeout.cancel();
104         }
105     }
106
107     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeHandler.class);
108
109     private final @NonNull List<Registration> yanglibRegistrations;
110     private final @NonNull NetconfClientFactory clientFactory;
111     private final @NonNull NetconfClientConfiguration clientConfig;
112     private final @NonNull NetconfDeviceCommunicator communicator;
113     private final @NonNull RemoteDeviceHandler delegate;
114     private final @NonNull NetconfTimer timer;
115     private final @NonNull RemoteDeviceId deviceId;
116
117     private final long maxBackoff;
118     private final long maxAttempts;
119     private final int minBackoff;
120     private final double backoffMultiplier;
121     private final double jitter;
122
123     @GuardedBy("this")
124     private long attempts;
125     @GuardedBy("this")
126     private long lastBackoff;
127     @GuardedBy("this")
128     private Task currentTask;
129
130     public NetconfNodeHandler(final NetconfClientFactory clientFactory, final NetconfTimer timer,
131             final BaseNetconfSchemas baseSchemas, final SchemaResourceManager schemaManager,
132             final NetconfTopologySchemaAssembler schemaAssembler,
133             final NetconfClientConfigurationBuilderFactory builderFactory,
134             final DeviceActionFactory deviceActionFactory, final RemoteDeviceHandler delegate,
135             final RemoteDeviceId deviceId, final NodeId nodeId, final NetconfNode node,
136             final NetconfNodeAugmentedOptional nodeOptional) {
137         this.clientFactory = requireNonNull(clientFactory);
138         this.timer = requireNonNull(timer);
139         this.delegate = requireNonNull(delegate);
140         this.deviceId = requireNonNull(deviceId);
141
142         maxAttempts = node.requireMaxConnectionAttempts().toJava();
143         minBackoff = node.requireMinBackoffMillis().toJava();
144         backoffMultiplier = node.requireBackoffMultiplier().doubleValue();
145         final long potentialMaxBackoff = node.requireMaxBackoffMillis().toJava();
146         maxBackoff = potentialMaxBackoff >= minBackoff ? potentialMaxBackoff : minBackoff;
147         jitter = node.getBackoffJitter().doubleValue();
148
149         // Setup reconnection on empty context, if so configured
150         // FIXME: NETCONF-925: implement this
151         if (nodeOptional != null && nodeOptional.getIgnoreMissingSchemaSources().getAllowed()) {
152             LOG.warn("Ignoring missing schema sources is not currently implemented for {}", deviceId);
153         }
154
155         // The facade we are going it present to NetconfDevice
156         RemoteDeviceHandler salFacade;
157         final KeepaliveSalFacade keepAliveFacade;
158         final long keepaliveDelay = node.requireKeepaliveDelay().toJava();
159         if (keepaliveDelay > 0) {
160             LOG.info("Adding keepalive facade, for device {}", nodeId);
161             salFacade = keepAliveFacade = new KeepaliveSalFacade(deviceId, this, timer, keepaliveDelay,
162                 node.requireDefaultRequestTimeoutMillis().toJava());
163         } else {
164             salFacade = this;
165             keepAliveFacade = null;
166         }
167
168         final RemoteDevice<NetconfDeviceCommunicator> device;
169         if (node.requireSchemaless()) {
170             device = new SchemalessNetconfDevice(baseSchemas, deviceId, salFacade);
171             yanglibRegistrations = List.of();
172         } else {
173             final var resources = schemaManager.getSchemaResources(node.getSchemaCacheDirectory(), nodeId.getValue());
174             device = new NetconfDeviceBuilder()
175                 .setReconnectOnSchemasChange(node.requireReconnectOnChangedSchema())
176                 .setSchemaResourcesDTO(resources)
177                 .setGlobalProcessingExecutor(schemaAssembler.executor())
178                 .setId(deviceId)
179                 .setSalFacade(salFacade)
180                 .setDeviceActionFactory(deviceActionFactory)
181                 .setBaseSchemas(baseSchemas)
182                 .build();
183             yanglibRegistrations = registerDeviceSchemaSources(deviceId, node, resources);
184         }
185
186         final int rpcMessageLimit = node.requireConcurrentRpcLimit().toJava();
187         if (rpcMessageLimit < 1) {
188             LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", deviceId);
189         }
190
191         communicator = new NetconfDeviceCommunicator(deviceId, device, rpcMessageLimit,
192             NetconfNodeUtils.extractUserCapabilities(node));
193
194         if (keepAliveFacade != null) {
195             keepAliveFacade.setListener(communicator);
196         }
197
198         clientConfig = builderFactory.createClientConfigurationBuilder(nodeId, node)
199             .withSessionListener(communicator)
200             .build();
201     }
202
203     public synchronized void connect() {
204         attempts = 1;
205         lastBackoff = minBackoff;
206         lockedConnect();
207     }
208
209     @Holding("this")
210     private void lockedConnect() {
211         final ListenableFuture<NetconfClientSession> connectFuture;
212         try {
213             connectFuture = clientFactory.createClient(clientConfig);
214         } catch (UnsupportedConfigurationException e) {
215             onDeviceFailed(e);
216             return;
217         }
218
219         final var nextTask = new ConnectingTask(connectFuture);
220         currentTask = nextTask;
221         Futures.addCallback(connectFuture, nextTask, MoreExecutors.directExecutor());
222     }
223
224     private synchronized void connectComplete(final ConnectingTask task) {
225         // Just clear the task, if it matches our expectation
226         completeTask(task);
227     }
228
229     private void connectFailed(final ConnectingTask task, final Throwable cause) {
230         synchronized (this) {
231             if (completeTask(task)) {
232                 // Mismatched future or the connection has been cancelled: nothing else to do
233                 return;
234             }
235             LOG.debug("Connection attempt {} to {} failed", attempts, deviceId, cause);
236         }
237
238         // We are invoking callbacks, do not hold locks
239         reconnectOrFail();
240     }
241
242     @Holding("this")
243     private boolean completeTask(final ConnectingTask task) {
244         // A quick sanity check
245         if (task.equals(currentTask)) {
246             currentTask = null;
247             return false;
248         }
249         LOG.warn("Ignoring connection completion, expected {} actual {}", currentTask, task);
250         return true;
251     }
252
253     @Override
254     protected synchronized void removeRegistration() {
255         if (currentTask != null) {
256             currentTask.cancel();
257             currentTask = null;
258         }
259
260         communicator.close();
261         delegate.close();
262         yanglibRegistrations.forEach(Registration::close);
263     }
264
265     @Override
266     public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
267             final NetconfSessionPreferences sessionPreferences, final RemoteDeviceServices services) {
268         synchronized (this) {
269             attempts = 0;
270         }
271         delegate.onDeviceConnected(deviceSchema, sessionPreferences, services);
272     }
273
274     @Override
275     public void onDeviceDisconnected() {
276         delegate.onDeviceDisconnected();
277         reconnectOrFail();
278     }
279
280     @Override
281     public void onDeviceFailed(final Throwable throwable) {
282         // We have not reported onDeviceConnected(), so from the view of delete we are still connecting
283         LOG.debug("Connection attempt failed", throwable);
284         reconnectOrFail();
285     }
286
287     @Override
288     public void onNotification(final DOMNotification domNotification) {
289         delegate.onNotification(domNotification);
290     }
291
292     private void reconnectOrFail() {
293         final var ex = scheduleReconnect();
294         if (ex != null) {
295             delegate.onDeviceFailed(ex);
296         }
297     }
298
299     private synchronized Exception scheduleReconnect() {
300         if (isClosed()) {
301             return null;
302         }
303
304         final long backoffMillis;
305
306         // We have exceeded the number of connection attempts
307         if (maxAttempts > 0 && attempts >= maxAttempts) {
308             LOG.info("Failed to connect {} after {} attempts, not attempting", deviceId, attempts);
309             return new ConnectGivenUpException("Given up connecting " + deviceId + " after " + attempts + " attempts");
310         }
311
312         // First connection attempt gets initialized to minimum backoff, each subsequent is exponentially backed off
313         // by backoffMultiplier (default 1.5) until reach max sleep and randomized by +/- jitter (default 0.1).
314         if (attempts != 0) {
315             final var currentBackoff = Math.min(lastBackoff * backoffMultiplier, maxBackoff);
316             backoffMillis = (long) (currentBackoff * (Math.random() * (jitter * 2) + (1 - jitter)));
317         } else {
318             backoffMillis = minBackoff;
319         }
320
321         attempts++;
322         lastBackoff = backoffMillis;
323         LOG.debug("Retrying {} connection attempt {} after {} milliseconds", deviceId, attempts, backoffMillis);
324
325         // Schedule a task for the right time. We always go through the executor to eliminate the special case of
326         // immediate reconnect. While we could check and got to lockedConnect(), it makes for a rare special case.
327         // That special case makes for more code paths to test and introduces additional uncertainty as to whether
328         // the attempt was executed on this thread or not.
329         currentTask = new SleepingTask(timer.newTimeout(this::reconnect, backoffMillis, TimeUnit.MILLISECONDS));
330         return null;
331     }
332
333     private synchronized void reconnect(final Timeout timeout) {
334         currentTask = null;
335         if (notClosed()) {
336             lockedConnect();
337         }
338     }
339
340     private static List<Registration> registerDeviceSchemaSources(final RemoteDeviceId remoteDeviceId,
341             final NetconfNode node, final SchemaResourcesDTO resources) {
342         final var yangLibrary = node.getYangLibrary();
343         if (yangLibrary != null) {
344             final Uri uri = yangLibrary.getYangLibraryUrl();
345             if (uri != null) {
346                 final var registrations = new ArrayList<Registration>();
347                 final var yangLibURL = uri.getValue();
348                 final var schemaRegistry = resources.getSchemaRegistry();
349
350                 // pre register yang library sources as fallback schemas to schema registry
351                 final var yangLibUsername = yangLibrary.getUsername();
352                 final var yangLigPassword = yangLibrary.getPassword();
353                 final var schemas = yangLibUsername != null && yangLigPassword != null
354                     ? LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword)
355                         : LibraryModulesSchemas.create(yangLibURL);
356
357                 for (var entry : schemas.getAvailableModels().entrySet()) {
358                     registrations.add(schemaRegistry.registerSchemaSource(new LibrarySchemaSourceProvider(
359                         remoteDeviceId, schemas.getAvailableModels()),
360                         PotentialSchemaSource.create(entry.getKey(), YangTextSource.class,
361                             PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
362                 }
363                 return List.copyOf(registrations);
364             }
365         }
366
367         return List.of();
368     }
369
370     @VisibleForTesting
371     synchronized long attempts() {
372         return attempts;
373     }
374 }