fc7d897c23d05f0cb26d4eb76360cbf524166df1
[netconf.git] / apps / netconf-topology / src / main / java / org / opendaylight / netconf / topology / spi / AbstractNetconfTopology.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.spi;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.collect.Lists;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.ListeningExecutorService;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import io.netty.util.concurrent.EventExecutor;
20 import java.net.URL;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.ExecutionException;
26 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
27 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
28 import org.opendaylight.controller.config.threadpool.ThreadPool;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
31 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
32 import org.opendaylight.netconf.client.NetconfClientDispatcher;
33 import org.opendaylight.netconf.client.NetconfClientSessionListener;
34 import org.opendaylight.netconf.client.conf.NetconfClientConfiguration;
35 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfiguration;
36 import org.opendaylight.netconf.client.conf.NetconfReconnectingClientConfigurationBuilder;
37 import org.opendaylight.netconf.client.mdsal.DatastoreBackedPublicKeyAuth;
38 import org.opendaylight.netconf.client.mdsal.LibraryModulesSchemas;
39 import org.opendaylight.netconf.client.mdsal.LibrarySchemaSourceProvider;
40 import org.opendaylight.netconf.client.mdsal.NetconfDevice.SchemaResourcesDTO;
41 import org.opendaylight.netconf.client.mdsal.NetconfDeviceBuilder;
42 import org.opendaylight.netconf.client.mdsal.NetconfDeviceCommunicator;
43 import org.opendaylight.netconf.client.mdsal.SchemalessNetconfDevice;
44 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
45 import org.opendaylight.netconf.client.mdsal.api.CredentialProvider;
46 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
47 import org.opendaylight.netconf.client.mdsal.api.RemoteDevice;
48 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceHandler;
49 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
50 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
51 import org.opendaylight.netconf.client.mdsal.api.SslHandlerFactoryProvider;
52 import org.opendaylight.netconf.client.mdsal.spi.KeepaliveSalFacade;
53 import org.opendaylight.netconf.nettyutil.ReconnectStrategyFactory;
54 import org.opendaylight.netconf.nettyutil.TimedReconnectStrategyFactory;
55 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler;
56 import org.opendaylight.netconf.nettyutil.handler.ssh.authentication.LoginPasswordHandler;
57 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.parameters.Protocol.Name;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.credentials.Credentials;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.credentials.credentials.KeyAuth;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.credentials.credentials.LoginPw;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.credentials.credentials.LoginPwUnencrypted;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.optional.rev221225.NetconfNodeAugmentedOptional;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
65 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
66 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
67 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
68 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
69 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
70 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
71 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
72 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
73 import org.opendaylight.yangtools.yang.common.Empty;
74 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
75 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
76 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
77 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
78 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82 public abstract class AbstractNetconfTopology {
83     private static final Logger LOG = LoggerFactory.getLogger(AbstractNetconfTopology.class);
84
85     private final HashMap<NodeId, NetconfConnectorDTO> activeConnectors = new HashMap<>();
86     private final NetconfClientDispatcher clientDispatcher;
87     private final EventExecutor eventExecutor;
88     private final DeviceActionFactory deviceActionFactory;
89     private final CredentialProvider credentialProvider;
90     private final SslHandlerFactoryProvider sslHandlerFactoryProvider;
91     private final SchemaResourceManager schemaManager;
92     private final BaseNetconfSchemas baseSchemas;
93
94     protected final ScheduledThreadPool keepaliveExecutor;
95     protected final ListeningExecutorService processingExecutor;
96     protected final DataBroker dataBroker;
97     protected final DOMMountPointService mountPointService;
98     protected final String topologyId;
99     protected final AAAEncryptionService encryptionService;
100
101     protected AbstractNetconfTopology(final String topologyId, final NetconfClientDispatcher clientDispatcher,
102                                       final EventExecutor eventExecutor, final ScheduledThreadPool keepaliveExecutor,
103                                       final ThreadPool processingExecutor, final SchemaResourceManager schemaManager,
104                                       final DataBroker dataBroker, final DOMMountPointService mountPointService,
105                                       final AAAEncryptionService encryptionService,
106                                       final DeviceActionFactory deviceActionFactory,
107                                       final BaseNetconfSchemas baseSchemas,
108                                       final CredentialProvider credentialProvider,
109                                       final SslHandlerFactoryProvider sslHandlerFactoryProvider) {
110         this.topologyId = requireNonNull(topologyId);
111         this.clientDispatcher = clientDispatcher;
112         this.eventExecutor = eventExecutor;
113         this.keepaliveExecutor = keepaliveExecutor;
114         this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
115         this.schemaManager = requireNonNull(schemaManager);
116         this.deviceActionFactory = deviceActionFactory;
117         this.dataBroker = requireNonNull(dataBroker);
118         this.mountPointService = mountPointService;
119         this.encryptionService = encryptionService;
120         this.baseSchemas = requireNonNull(baseSchemas);
121         this.credentialProvider = requireNonNull(credentialProvider);
122         this.sslHandlerFactoryProvider = requireNonNull(sslHandlerFactoryProvider);
123
124         // FIXME: this should be a put(), as we are initializing and will be re-populating the datastore with all the
125         //        devices. Whatever has been there before should be nuked to properly re-align lifecycle.
126         final var wtx = dataBroker.newWriteOnlyTransaction();
127         wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
128             .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
129             .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
130         final var future = wtx.commit();
131         try {
132             future.get();
133         } catch (InterruptedException | ExecutionException e) {
134             LOG.error("Unable to initialize topology {}", topologyId, e);
135             throw new IllegalStateException(e);
136         }
137
138         LOG.debug("Topology {} initialized", topologyId);
139     }
140
141     // Non-final for testing
142     protected void ensureNode(final Node node) {
143         lockedEnsureNode(node);
144     }
145
146     private synchronized void lockedEnsureNode(final Node node) {
147         final var nodeId = node.requireNodeId();
148         final var prev = activeConnectors.remove(nodeId);
149         if (prev != null) {
150             LOG.info("RemoteDevice{{}} was already configured, disconnecting", nodeId);
151             prev.close();
152         }
153
154         LOG.info("Connecting RemoteDevice{{}}, with config {}", nodeId, hideCredentials(node));
155         setupConnection(nodeId, node);
156     }
157
158     // Non-final for testing
159     protected void deleteNode(final NodeId nodeId) {
160         lockedDeleteNode(nodeId);
161     }
162
163     private synchronized void lockedDeleteNode(final NodeId nodeId) {
164         final var nodeName = nodeId.getValue();
165         LOG.debug("Disconnecting RemoteDevice{{}}", nodeName);
166
167         final var connectorDTO = activeConnectors.remove(nodeId);
168         if (connectorDTO != null) {
169             connectorDTO.close();
170         }
171     }
172
173     protected final synchronized void deleteAllNodes() {
174         activeConnectors.values().forEach(NetconfConnectorDTO::close);
175         activeConnectors.clear();
176     }
177
178     protected final void setupConnection(final NodeId nodeId, final Node configNode) {
179         final NetconfNode netconfNode = configNode.augmentation(NetconfNode.class);
180         final NetconfNodeAugmentedOptional nodeOptional = configNode.augmentation(NetconfNodeAugmentedOptional.class);
181
182         requireNonNull(netconfNode.getHost());
183         requireNonNull(netconfNode.getPort());
184
185         final NetconfConnectorDTO deviceCommunicatorDTO = createDeviceCommunicator(nodeId, netconfNode, nodeOptional);
186         final NetconfDeviceCommunicator deviceCommunicator = deviceCommunicatorDTO.getCommunicator();
187         final NetconfClientSessionListener netconfClientSessionListener = deviceCommunicatorDTO.getSessionListener();
188         final NetconfReconnectingClientConfiguration clientConfig =
189                 getClientConfig(netconfClientSessionListener, netconfNode, nodeId);
190         final ListenableFuture<Empty> future =
191                 deviceCommunicator.initializeRemoteConnection(clientDispatcher, clientConfig);
192
193         activeConnectors.put(nodeId, deviceCommunicatorDTO);
194
195         Futures.addCallback(future, new FutureCallback<>() {
196             @Override
197             public void onSuccess(final Empty result) {
198                 LOG.debug("Connector for {} started succesfully", nodeId.getValue());
199             }
200
201             @Override
202             public void onFailure(final Throwable throwable) {
203                 LOG.error("Connector for {} failed", nodeId.getValue(), throwable);
204                 // remove this node from active connectors?
205             }
206         }, MoreExecutors.directExecutor());
207     }
208
209     protected NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId, final NetconfNode node,
210             final NetconfNodeAugmentedOptional nodeOptional) {
211         final var deviceId = NetconfNodeUtils.toRemoteDeviceId(nodeId, node);
212         final long keepaliveDelay = node.requireKeepaliveDelay().toJava();
213
214         final var deviceSalFacade = createSalFacade(deviceId, node.requireLockDatastore());
215         // The facade we are going it present to NetconfDevice
216         RemoteDeviceHandler salFacade;
217         final KeepaliveSalFacade keepAliveFacade;
218         if (keepaliveDelay > 0) {
219             LOG.info("Adding keepalive facade, for device {}", nodeId);
220             salFacade = keepAliveFacade = new KeepaliveSalFacade(deviceId, deviceSalFacade,
221                 keepaliveExecutor.getExecutor(), keepaliveDelay, node.requireDefaultRequestTimeoutMillis().toJava());
222         } else {
223             salFacade = deviceSalFacade;
224             keepAliveFacade = null;
225         }
226
227         // Setup reconnection on empty context, if so configured
228         if (nodeOptional != null && nodeOptional.getIgnoreMissingSchemaSources().getAllowed()) {
229             LOG.warn("Ignoring missing schema sources is not currently implemented for {}", deviceId);
230         }
231
232         final RemoteDevice<NetconfDeviceCommunicator> device;
233         final List<SchemaSourceRegistration<?>> yanglibRegistrations;
234         if (node.requireSchemaless()) {
235             device = new SchemalessNetconfDevice(baseSchemas, deviceId, salFacade);
236             yanglibRegistrations = List.of();
237         } else {
238             final SchemaResourcesDTO resources = schemaManager.getSchemaResources(node.getSchemaCacheDirectory(),
239                 nodeId.getValue());
240             device = new NetconfDeviceBuilder()
241                 .setReconnectOnSchemasChange(node.requireReconnectOnChangedSchema())
242                 .setSchemaResourcesDTO(resources)
243                 .setGlobalProcessingExecutor(processingExecutor)
244                 .setId(deviceId)
245                 .setSalFacade(salFacade)
246                 .setDeviceActionFactory(deviceActionFactory)
247                 .setBaseSchemas(baseSchemas)
248                 .build();
249             yanglibRegistrations = registerDeviceSchemaSources(deviceId, node, resources);
250         }
251
252         final int rpcMessageLimit = node.requireConcurrentRpcLimit().toJava();
253         if (rpcMessageLimit < 1) {
254             LOG.info("Concurrent rpc limit is smaller than 1, no limit will be enforced for device {}", deviceId);
255         }
256
257         final var netconfDeviceCommunicator = new NetconfDeviceCommunicator(deviceId, device, rpcMessageLimit,
258             NetconfNodeUtils.extractUserCapabilities(node));
259
260         if (keepAliveFacade != null) {
261             keepAliveFacade.setListener(netconfDeviceCommunicator);
262         }
263
264         return new NetconfConnectorDTO(netconfDeviceCommunicator, salFacade, yanglibRegistrations);
265     }
266
267     protected RemoteDeviceHandler createSalFacade(final RemoteDeviceId deviceId, final boolean lockDatastore) {
268         return new NetconfTopologyDeviceSalFacade(deviceId, mountPointService, lockDatastore, dataBroker);
269     }
270
271     private static List<SchemaSourceRegistration<?>> registerDeviceSchemaSources(final RemoteDeviceId remoteDeviceId,
272             final NetconfNode node, final SchemaResourcesDTO resources) {
273         final var yangLibrary = node.getYangLibrary();
274         if (yangLibrary != null) {
275             final Uri uri = yangLibrary.getYangLibraryUrl();
276             if (uri != null) {
277                 final List<SchemaSourceRegistration<?>> registrations = new ArrayList<>();
278                 final String yangLibURL = uri.getValue();
279                 final SchemaSourceRegistry schemaRegistry = resources.getSchemaRegistry();
280
281                 // pre register yang library sources as fallback schemas to schema registry
282                 final LibraryModulesSchemas schemas;
283                 final String yangLibUsername = yangLibrary.getUsername();
284                 final String yangLigPassword = yangLibrary.getPassword();
285                 if (yangLibUsername != null && yangLigPassword != null) {
286                     schemas = LibraryModulesSchemas.create(yangLibURL, yangLibUsername, yangLigPassword);
287                 } else {
288                     schemas = LibraryModulesSchemas.create(yangLibURL);
289                 }
290
291                 for (final Map.Entry<SourceIdentifier, URL> entry : schemas.getAvailableModels().entrySet()) {
292                     registrations.add(schemaRegistry.registerSchemaSource(new LibrarySchemaSourceProvider(
293                         remoteDeviceId, schemas.getAvailableModels()),
294                         PotentialSchemaSource.create(entry.getKey(), YangTextSchemaSource.class,
295                             PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
296                 }
297                 return List.copyOf(registrations);
298             }
299         }
300
301         return List.of();
302     }
303
304     public NetconfReconnectingClientConfiguration getClientConfig(final NetconfClientSessionListener listener,
305                                                                   final NetconfNode node, final NodeId nodeId) {
306         final ReconnectStrategyFactory sf = new TimedReconnectStrategyFactory(eventExecutor,
307                 node.requireMaxConnectionAttempts().toJava(), node.requireBetweenAttemptsTimeoutMillis().toJava(),
308                 node.requireSleepFactor().decimalValue());
309         final NetconfReconnectingClientConfigurationBuilder reconnectingClientConfigurationBuilder;
310         final var protocol = node.getProtocol();
311         if (node.requireTcpOnly()) {
312             reconnectingClientConfigurationBuilder = NetconfReconnectingClientConfigurationBuilder.create()
313                     .withProtocol(NetconfClientConfiguration.NetconfClientProtocol.TCP)
314                     .withAuthHandler(getHandlerFromCredentials(node.getCredentials()));
315         } else if (protocol == null || protocol.getName() == Name.SSH) {
316             reconnectingClientConfigurationBuilder = NetconfReconnectingClientConfigurationBuilder.create()
317                     .withProtocol(NetconfClientConfiguration.NetconfClientProtocol.SSH)
318                     .withAuthHandler(getHandlerFromCredentials(node.getCredentials()));
319         } else if (protocol.getName() == Name.TLS) {
320             reconnectingClientConfigurationBuilder = NetconfReconnectingClientConfigurationBuilder.create()
321                 .withSslHandlerFactory(sslHandlerFactoryProvider.getSslHandlerFactory(protocol.getSpecification()))
322                 .withProtocol(NetconfClientConfiguration.NetconfClientProtocol.TLS);
323         } else {
324             throw new IllegalStateException("Unsupported protocol type: " + protocol.getName());
325         }
326
327         if (node.getOdlHelloMessageCapabilities() != null) {
328             reconnectingClientConfigurationBuilder.withOdlHelloCapabilities(
329                     Lists.newArrayList(node.getOdlHelloMessageCapabilities().getCapability()));
330         }
331
332         return reconnectingClientConfigurationBuilder
333                 .withName(nodeId.getValue())
334                 .withAddress(NetconfNodeUtils.toInetSocketAddress(node))
335                 .withConnectionTimeoutMillis(node.requireConnectionTimeoutMillis().toJava())
336                 .withReconnectStrategy(sf.createReconnectStrategy())
337                 .withConnectStrategyFactory(sf)
338                 .withSessionListener(listener)
339                 .build();
340     }
341
342     private AuthenticationHandler getHandlerFromCredentials(final Credentials credentials) {
343         if (credentials
344                 instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430
345                     .credentials.credentials.LoginPassword loginPassword) {
346             return new LoginPasswordHandler(loginPassword.getUsername(), loginPassword.getPassword());
347         }
348         if (credentials instanceof LoginPwUnencrypted unencrypted) {
349             final var loginPassword = unencrypted.getLoginPasswordUnencrypted();
350             return new LoginPasswordHandler(loginPassword.getUsername(), loginPassword.getPassword());
351         }
352         if (credentials instanceof LoginPw loginPw) {
353             final var loginPassword = loginPw.getLoginPassword();
354             return new LoginPasswordHandler(loginPassword.getUsername(),
355                     encryptionService.decrypt(loginPassword.getPassword()));
356         }
357         if (credentials instanceof KeyAuth keyAuth) {
358             final var keyPair = keyAuth.getKeyBased();
359             return new DatastoreBackedPublicKeyAuth(keyPair.getUsername(), keyPair.getKeyId(), credentialProvider,
360                 encryptionService);
361         }
362         throw new IllegalStateException("Unsupported credential type: " + credentials.getClass());
363     }
364
365     /**
366      * Hiding of private credentials from node configuration (credentials data is replaced by asterisks).
367      *
368      * @param nodeConfiguration Node configuration container.
369      * @return String representation of node configuration with credentials replaced by asterisks.
370      */
371     @VisibleForTesting
372     public static final String hideCredentials(final Node nodeConfiguration) {
373         final var netconfNodeAugmentation = nodeConfiguration.augmentation(NetconfNode.class);
374         final var nodeCredentials = netconfNodeAugmentation.getCredentials().toString();
375         final var nodeConfigurationString = nodeConfiguration.toString();
376         return nodeConfigurationString.replace(nodeCredentials, "***");
377     }
378 }