Use base64 encoding for netconf device passwords
[netconf.git] / apps / netconf-topology / src / main / java / org / opendaylight / netconf / topology / spi / NetconfNodeHandler.java
1 /*
2  * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.spi;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import io.netty.util.Timeout;
18 import io.netty.util.Timer;
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.Executor;
23 import java.util.concurrent.TimeUnit;
24 import org.checkerframework.checker.lock.qual.GuardedBy;
25 import org.checkerframework.checker.lock.qual.Holding;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.opendaylight.mdsal.dom.api.DOMNotification;
28 import org.opendaylight.netconf.client.NetconfClientFactory;
29 import org.opendaylight.netconf.client.NetconfClientSession;
30 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
31 import org.opendaylight.netconf.client.mdsal.LibraryModulesSchemas;
32 import org.opendaylight.netconf.client.mdsal.LibrarySchemaSourceProvider;
33 import org.opendaylight.netconf.client.mdsal.NetconfDevice.SchemaResourcesDTO;
34 import org.opendaylight.netconf.client.mdsal.NetconfDeviceBuilder;
35 import org.opendaylight.netconf.client.mdsal.NetconfDeviceCommunicator;
36 import org.opendaylight.netconf.client.mdsal.NetconfDeviceSchema;
37 import org.opendaylight.netconf.client.mdsal.SchemalessNetconfDevice;
38 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
39 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
40 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
41 import org.opendaylight.netconf.client.mdsal.api.RemoteDevice;
42 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceHandler;
43 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
44 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
45 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
46 import org.opendaylight.netconf.client.mdsal.spi.KeepaliveSalFacade;
47 import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
48 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.optional.rev221225.NetconfNodeAugmentedOptional;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev231121.NetconfNode;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
52 import org.opendaylight.yangtools.concepts.AbstractRegistration;
53 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
54 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
55 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 /**
60  * All state associated with a NETCONF topology node. Each node handles its own reconnection.
61  */
62 public final class NetconfNodeHandler extends AbstractRegistration implements RemoteDeviceHandler {
63     private abstract static sealed class Task {
64
65         abstract void cancel();
66     }
67
68     private final class ConnectingTask extends Task implements FutureCallback<NetconfClientSession> {
69         private final ListenableFuture<NetconfClientSession> future;
70
71         ConnectingTask(final ListenableFuture<NetconfClientSession> future) {
72             this.future = requireNonNull(future);
73         }
74
75         @Override
76         void cancel() {
77             future.cancel(false);
78         }
79
80         @Override
81         public void onSuccess(final NetconfClientSession result) {
82             connectComplete(this);
83         }
84
85         @Override
86         public void onFailure(final Throwable cause) {
87             if (cause instanceof CancellationException) {
88                 connectComplete(this);
89             } else {
90                 connectFailed(this, cause);
91             }
92         }
93     }
94
95     private static final class SleepingTask extends Task {
96         private final Timeout timeout;
97
98         SleepingTask(final Timeout timeout) {
99             this.timeout = requireNonNull(timeout);
100         }
101
102         @Override
103         void cancel() {
104             timeout.cancel();
105         }
106     }
107
108     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeHandler.class);
109
110     private final @NonNull List<SchemaSourceRegistration<?>> yanglibRegistrations;
111     private final @NonNull NetconfClientFactory clientFactory;
112     private final @NonNull NetconfClientConfiguration clientConfig;
113     private final @NonNull NetconfDeviceCommunicator communicator;
114     private final @NonNull RemoteDeviceHandler delegate;
115     private final @NonNull Timer timer;
116     private final @NonNull RemoteDeviceId deviceId;
117
118     private final long maxAttempts;
119     private final int minSleep;
120     private final double sleepFactor;
121
122     @GuardedBy("this")
123     private long attempts;
124     @GuardedBy("this")
125     private long lastSleep;
126     @GuardedBy("this")
127     private Task currentTask;
128
129     public NetconfNodeHandler(final NetconfClientFactory clientFactory, final Timer timer,
130             final BaseNetconfSchemas baseSchemas, final SchemaResourceManager schemaManager,
131             final Executor processingExecutor, final NetconfClientConfigurationBuilderFactory builderFactory,
132             final DeviceActionFactory deviceActionFactory, final RemoteDeviceHandler delegate,
133             final RemoteDeviceId deviceId, final NodeId nodeId, final NetconfNode node,
134             final NetconfNodeAugmentedOptional nodeOptional) {
135         this.clientFactory = requireNonNull(clientFactory);
136         this.timer = requireNonNull(timer);
137         this.delegate = requireNonNull(delegate);
138         this.deviceId = requireNonNull(deviceId);
139
140         maxAttempts = node.requireMaxConnectionAttempts().toJava();
141         minSleep = node.requireBetweenAttemptsTimeoutMillis().toJava();
142         sleepFactor = node.requireSleepFactor().doubleValue();
143
144         // Setup reconnection on empty context, if so configured
145         // FIXME: NETCONF-925: implement this
146         if (nodeOptional != null && nodeOptional.getIgnoreMissingSchemaSources().getAllowed()) {
147             LOG.warn("Ignoring missing schema sources is not currently implemented for {}", deviceId);
148         }
149
150         // The facade we are going it present to NetconfDevice
151         RemoteDeviceHandler salFacade;
152         final KeepaliveSalFacade keepAliveFacade;
153         final long keepaliveDelay = node.requireKeepaliveDelay().toJava();
154         if (keepaliveDelay > 0) {
155             LOG.info("Adding keepalive facade, for device {}", nodeId);
156             salFacade = keepAliveFacade = new KeepaliveSalFacade(deviceId, this, timer, keepaliveDelay,
157                 node.requireDefaultRequestTimeoutMillis().toJava());
158         } else {
159             salFacade = this;
160             keepAliveFacade = null;
161         }
162
163         final RemoteDevice<NetconfDeviceCommunicator> device;
164         if (node.requireSchemaless()) {
165             device = new SchemalessNetconfDevice(baseSchemas, deviceId, salFacade);
166             yanglibRegistrations = List.of();
167         } else {
168             final var resources = schemaManager.getSchemaResources(node.getSchemaCacheDirectory(), nodeId.getValue());
169             device = new NetconfDeviceBuilder()
170                 .setReconnectOnSchemasChange(node.requireReconnectOnChangedSchema())
171                 .setSchemaResourcesDTO(resources)
172                 .setGlobalProcessingExecutor(processingExecutor)
173                 .setId(deviceId)
174                 .setSalFacade(salFacade)
175                 .setDeviceActionFactory(deviceActionFactory)
176                 .setBaseSchemas(baseSchemas)
177                 .build();
178             yanglibRegistrations = registerDeviceSchemaSources(deviceId, node, resources);
179         }
180
181         final int rpcMessageLimit = node.requireConcurrentRpcLimit().toJava();
182         if (rpcMessageLimit < 1) {
183             LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", deviceId);
184         }
185
186         communicator = new NetconfDeviceCommunicator(deviceId, device, rpcMessageLimit,
187             NetconfNodeUtils.extractUserCapabilities(node));
188
189         if (keepAliveFacade != null) {
190             keepAliveFacade.setListener(communicator);
191         }
192
193         clientConfig = builderFactory.createClientConfigurationBuilder(nodeId, node)
194             .withSessionListener(communicator)
195             .build();
196     }
197
198     public synchronized void connect() {
199         attempts = 1;
200         lastSleep = minSleep;
201         lockedConnect();
202     }
203
204     @Holding("this")
205     private void lockedConnect() {
206         final ListenableFuture<NetconfClientSession> connectFuture;
207         try {
208             connectFuture = clientFactory.createClient(clientConfig);
209         } catch (UnsupportedConfigurationException e) {
210             onDeviceFailed(e);
211             return;
212         }
213
214         final var nextTask = new ConnectingTask(connectFuture);
215         currentTask = nextTask;
216         Futures.addCallback(connectFuture, nextTask, MoreExecutors.directExecutor());
217     }
218
219     private synchronized void connectComplete(final ConnectingTask task) {
220         // Just clear the task, if it matches our expectation
221         completeTask(task);
222     }
223
224     private void connectFailed(final ConnectingTask task, final Throwable cause) {
225         synchronized (this) {
226             if (completeTask(task)) {
227                 // Mismatched future or the connection has been cancelled: nothing else to do
228                 return;
229             }
230             LOG.debug("Connection attempt {} to {} failed", attempts, deviceId, cause);
231         }
232
233         // We are invoking callbacks, do not hold locks
234         reconnectOrFail();
235     }
236
237     @Holding("this")
238     private boolean completeTask(final ConnectingTask task) {
239         // A quick sanity check
240         if (task.equals(currentTask)) {
241             currentTask = null;
242             return false;
243         }
244         LOG.warn("Ignoring connection completion, expected {} actual {}", currentTask, task);
245         return true;
246     }
247
248     @Override
249     protected synchronized void removeRegistration() {
250         if (currentTask != null) {
251             currentTask.cancel();
252             currentTask = null;
253         }
254
255         communicator.close();
256         delegate.close();
257         yanglibRegistrations.forEach(SchemaSourceRegistration::close);
258     }
259
260     @Override
261     public void onDeviceConnected(final NetconfDeviceSchema deviceSchema,
262             final NetconfSessionPreferences sessionPreferences, final RemoteDeviceServices services) {
263         synchronized (this) {
264             attempts = 0;
265         }
266         delegate.onDeviceConnected(deviceSchema, sessionPreferences, services);
267     }
268
269     @Override
270     public void onDeviceDisconnected() {
271         delegate.onDeviceDisconnected();
272         reconnectOrFail();
273     }
274
275     @Override
276     public void onDeviceFailed(final Throwable throwable) {
277         // We have not reported onDeviceConnected(), so from the view of delete we are still connecting
278         LOG.debug("Connection attempt failed", throwable);
279         reconnectOrFail();
280     }
281
282     @Override
283     public void onNotification(final DOMNotification domNotification) {
284         delegate.onNotification(domNotification);
285     }
286
287     private void reconnectOrFail() {
288         final var ex = scheduleReconnect();
289         if (ex != null) {
290             delegate.onDeviceFailed(ex);
291         }
292     }
293
294     private synchronized Exception scheduleReconnect() {
295         if (isClosed()) {
296             return null;
297         }
298
299         final long delayMillis;
300
301         // We have exceeded the number of connection attempts
302         if (maxAttempts > 0 && attempts >= maxAttempts) {
303             LOG.info("Failed to connect {} after {} attempts, not attempting", deviceId, attempts);
304             return new ConnectGivenUpException("Given up connecting " + deviceId + " after " + attempts + " attempts");
305         }
306
307         // First connection attempt gets initialized to minimum sleep, each subsequent is exponentially backed off
308         // by sleepFactor.
309         if (attempts != 0) {
310             final long nextSleep = (long) (lastSleep * sleepFactor);
311             // check for overflow
312             delayMillis = nextSleep >= 0 ? nextSleep : Long.MAX_VALUE;
313         } else {
314             delayMillis = minSleep;
315         }
316
317         attempts++;
318         lastSleep = delayMillis;
319         LOG.debug("Retrying {} connection attempt {} after {} milliseconds", deviceId, attempts, delayMillis);
320
321         // Schedule a task for the right time. We always go through the executor to eliminate the special case of
322         // immediate reconnect. While we could check and got to lockedConnect(), it makes for a rare special case.
323         // That special case makes for more code paths to test and introduces additional uncertainty as to whether
324         // the attempt was executed on this thread or not.
325         currentTask = new SleepingTask(timer.newTimeout(this::reconnect, delayMillis, TimeUnit.MILLISECONDS));
326         return null;
327     }
328
329     private synchronized void reconnect(final Timeout timeout) {
330         currentTask = null;
331         if (notClosed()) {
332             lockedConnect();
333         }
334     }
335
336     private static List<SchemaSourceRegistration<?>> registerDeviceSchemaSources(final RemoteDeviceId remoteDeviceId,
337             final NetconfNode node, final SchemaResourcesDTO resources) {
338         final var yangLibrary = node.getYangLibrary();
339         if (yangLibrary != null) {
340             final Uri uri = yangLibrary.getYangLibraryUrl();
341             if (uri != null) {
342                 final var registrations = new ArrayList<SchemaSourceRegistration<?>>();
343                 final var yangLibURL = uri.getValue();
344                 final var schemaRegistry = resources.getSchemaRegistry();
345
346                 // pre register yang library sources as fallback schemas to schema registry
347                 final var yangLibUsername = yangLibrary.getUsername();
348                 final var yangLigPassword = yangLibrary.getPassword();
349                 final var schemas = yangLibUsername != null && yangLigPassword != null
350                     ? LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword)
351                         : LibraryModulesSchemas.create(yangLibURL);
352
353                 for (var entry : schemas.getAvailableModels().entrySet()) {
354                     registrations.add(schemaRegistry.registerSchemaSource(new LibrarySchemaSourceProvider(
355                         remoteDeviceId, schemas.getAvailableModels()),
356                         PotentialSchemaSource.create(entry.getKey(), YangTextSchemaSource.class,
357                             PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
358                 }
359                 return List.copyOf(registrations);
360             }
361         }
362
363         return List.of();
364     }
365
366     @VisibleForTesting
367     synchronized long attempts() {
368         return attempts;
369     }
370 }