Centralize NetconfNode/InetSocketAddress conversion
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.time.Duration;
20 import java.util.Collection;
21 import java.util.Map;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ScheduledExecutorService;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.RpcProviderService;
34 import org.opendaylight.mdsal.binding.api.WriteTransaction;
35 import org.opendaylight.mdsal.common.api.CommitInfo;
36 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
37 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
38 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
39 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
42 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
43 import org.opendaylight.netconf.client.NetconfClientDispatcher;
44 import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
45 import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
46 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseNetconfSchemas;
47 import org.opendaylight.netconf.sal.connect.util.NetconfTopologyRPCProvider;
48 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
49 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
50 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
52 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
53 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeTopologyService;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.concepts.Registration;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70
71 public class NetconfTopologyManager
72         implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
73
74     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
75
76     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
77     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
78             clusterRegistrations = new ConcurrentHashMap<>();
79
80     private final BaseNetconfSchemas baseSchemas;
81     private final DataBroker dataBroker;
82     private final DOMRpcProviderService rpcProviderRegistry;
83     private final DOMActionProviderService actionProviderRegistry;
84     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
85     private final ScheduledExecutorService keepaliveExecutor;
86     private final ListeningExecutorService processingExecutor;
87     private final ActorSystem actorSystem;
88     private final EventExecutor eventExecutor;
89     private final NetconfClientDispatcher clientDispatcher;
90     private final String topologyId;
91     private final Duration writeTxIdleTimeout;
92     private final DOMMountPointService mountPointService;
93     private final AAAEncryptionService encryptionService;
94     private final RpcProviderService rpcProviderService;
95     private final DeviceActionFactory deviceActionFactory;
96     private final SchemaResourceManager resourceManager;
97
98     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
99     private Registration rpcReg;
100     private String privateKeyPath;
101     private String privateKeyPassphrase;
102
103     public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
104                                   final DOMRpcProviderService rpcProviderRegistry,
105                                   final DOMActionProviderService actionProviderService,
106                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
107                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
108                                   final ActorSystemProvider actorSystemProvider,
109                                   final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
110                                   final String topologyId, final Config config,
111                                   final DOMMountPointService mountPointService,
112                                   final AAAEncryptionService encryptionService,
113                                   final RpcProviderService rpcProviderService,
114                                   final DeviceActionFactory deviceActionFactory,
115                                   final SchemaResourceManager resourceManager) {
116         this.baseSchemas = requireNonNull(baseSchemas);
117         this.dataBroker = requireNonNull(dataBroker);
118         this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
119         this.actionProviderRegistry = requireNonNull(actionProviderService);
120         this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
121         this.keepaliveExecutor = keepaliveExecutor.getExecutor();
122         this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
123         this.actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
124         this.eventExecutor = requireNonNull(eventExecutor);
125         this.clientDispatcher = requireNonNull(clientDispatcher);
126         this.topologyId = requireNonNull(topologyId);
127         this.writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
128         this.mountPointService = mountPointService;
129         this.encryptionService = requireNonNull(encryptionService);
130         this.rpcProviderService = requireNonNull(rpcProviderService);
131         this.deviceActionFactory = requireNonNull(deviceActionFactory);
132         this.resourceManager = requireNonNull(resourceManager);
133     }
134
135     // Blueprint init method
136     public void init() {
137         dataChangeListenerRegistration = registerDataTreeChangeListener();
138         rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
139             new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
140     }
141
142     @Override
143     public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
144         for (final DataTreeModification<Node> change : changes) {
145             final DataObjectModification<Node> rootNode = change.getRootNode();
146             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
147             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
148             switch (rootNode.getModificationType()) {
149                 case SUBTREE_MODIFIED:
150                     LOG.debug("Config for node {} updated", nodeId);
151                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
152                     break;
153                 case WRITE:
154                     if (contexts.containsKey(dataModifIdent)) {
155                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
156                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
157                     } else {
158                         LOG.debug("Config for node {} created", nodeId);
159                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
160                     }
161                     break;
162                 case DELETE:
163                     LOG.debug("Config for node {} deleted", nodeId);
164                     stopNetconfDeviceContext(dataModifIdent);
165                     break;
166                 default:
167                     LOG.warn("Unknown operation for {}.", nodeId);
168             }
169         }
170     }
171
172     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
173         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
174         context.refresh(createSetup(instanceIdentifier, node));
175     }
176
177     // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
178     // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
179     // retry registration several times and log the error.
180     // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
181     @SuppressWarnings("checkstyle:IllegalCatch")
182     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
183         final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
184
185         final Timeout actorResponseWaitTime = Timeout.create(
186                 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
187
188         final ServiceGroupIdentifier serviceGroupIdent =
189                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
190
191         final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
192             createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
193
194         int tries = 3;
195         while (true) {
196             try {
197                 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
198                         clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
199                 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
200                 contexts.put(instanceIdentifier, newNetconfTopologyContext);
201                 break;
202             } catch (final RuntimeException e) {
203                 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
204
205                 if (--tries <= 0) {
206                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
207                             newNetconfTopologyContext, e);
208                     close(newNetconfTopologyContext);
209                     break;
210                 }
211             }
212         }
213     }
214
215     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
216         final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
217         if (netconfTopologyContext != null) {
218             close(clusterRegistrations.remove(instanceIdentifier));
219             close(netconfTopologyContext);
220         }
221     }
222
223     @VisibleForTesting
224     protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
225             final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
226             final DeviceActionFactory deviceActionFact) {
227         return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService,
228             deviceActionFact);
229     }
230
231     @Override
232     public void close() {
233         if (rpcReg != null) {
234             rpcReg.close();
235             rpcReg = null;
236         }
237         if (dataChangeListenerRegistration != null) {
238             dataChangeListenerRegistration.close();
239             dataChangeListenerRegistration = null;
240         }
241
242         contexts.values().forEach(NetconfTopologyManager::close);
243         clusterRegistrations.values().forEach(NetconfTopologyManager::close);
244
245         contexts.clear();
246         clusterRegistrations.clear();
247     }
248
249     @SuppressWarnings("checkstyle:IllegalCatch")
250     private static void close(final AutoCloseable closeable) {
251         try {
252             closeable.close();
253         } catch (Exception e) {
254             LOG.warn("Error closing {}", closeable, e);
255         }
256     }
257
258     /**
259      * Sets the private key path from location specified in configuration file using blueprint.
260      */
261     public void setPrivateKeyPath(final String privateKeyPath) {
262         this.privateKeyPath = privateKeyPath;
263     }
264
265     /**
266      * Sets the private key passphrase from location specified in configuration file using blueprint.
267      */
268     public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
269         this.privateKeyPassphrase = privateKeyPassphrase;
270     }
271
272     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
273         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
274         initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
275         initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
276         wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
277             @Override
278             public void onSuccess(final CommitInfo result) {
279                 LOG.debug("topology initialization successful");
280             }
281
282             @Override
283             public void onFailure(final Throwable throwable) {
284                 LOG.error("Unable to initialize netconf-topology", throwable);
285             }
286         }, MoreExecutors.directExecutor());
287
288         LOG.debug("Registering datastore listener");
289         return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
290             NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
291     }
292
293     private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
294         final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
295         final InstanceIdentifier<NetworkTopology> networkTopologyId =
296                 InstanceIdentifier.builder(NetworkTopology.class).build();
297         wtx.merge(datastoreType, networkTopologyId, networkTopology);
298         final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
299         wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
300                 new TopologyKey(new TopologyId(topologyId))), topology);
301     }
302
303     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
304         final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
305         final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
306
307         return NetconfTopologySetupBuilder.create()
308                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
309                 .setBaseSchemas(baseSchemas)
310                 .setDataBroker(dataBroker)
311                 .setInstanceIdentifier(instanceIdentifier)
312                 .setRpcProviderRegistry(rpcProviderRegistry)
313                 .setActionProviderRegistry(actionProviderRegistry)
314                 .setNode(node)
315                 .setActorSystem(actorSystem)
316                 .setEventExecutor(eventExecutor)
317                 .setKeepaliveExecutor(keepaliveExecutor)
318                 .setProcessingExecutor(processingExecutor)
319                 .setTopologyId(topologyId)
320                 .setNetconfClientDispatcher(clientDispatcher)
321                 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode, deviceId))
322                 .setIdleTimeout(writeTxIdleTimeout)
323                 .setPrivateKeyPath(privateKeyPath)
324                 .setPrivateKeyPassphrase(privateKeyPassphrase)
325                 .setEncryptionService(encryptionService)
326                 .build();
327     }
328 }