64faa511459e89bbc030efb5f87f70b7ab85a91e
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.time.Duration;
20 import java.util.Collection;
21 import java.util.Map;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ScheduledExecutorService;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.RpcProviderService;
34 import org.opendaylight.mdsal.binding.api.WriteTransaction;
35 import org.opendaylight.mdsal.common.api.CommitInfo;
36 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
37 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
38 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
39 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
42 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
43 import org.opendaylight.netconf.client.NetconfClientDispatcher;
44 import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
45 import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
46 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseNetconfSchemas;
47 import org.opendaylight.netconf.sal.connect.util.NetconfTopologyRPCProvider;
48 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
49 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
50 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
52 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeTopologyService;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
56 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
64 import org.opendaylight.yangtools.concepts.ListenerRegistration;
65 import org.opendaylight.yangtools.concepts.Registration;
66 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 public class NetconfTopologyManager
71         implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
72
73     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
74
75     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
76     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
77             clusterRegistrations = new ConcurrentHashMap<>();
78
79     private final BaseNetconfSchemas baseSchemas;
80     private final DataBroker dataBroker;
81     private final DOMRpcProviderService rpcProviderRegistry;
82     private final DOMActionProviderService actionProviderRegistry;
83     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
84     private final ScheduledExecutorService keepaliveExecutor;
85     private final ListeningExecutorService processingExecutor;
86     private final ActorSystem actorSystem;
87     private final EventExecutor eventExecutor;
88     private final NetconfClientDispatcher clientDispatcher;
89     private final String topologyId;
90     private final Duration writeTxIdleTimeout;
91     private final DOMMountPointService mountPointService;
92     private final AAAEncryptionService encryptionService;
93     private final RpcProviderService rpcProviderService;
94     private final DeviceActionFactory deviceActionFactory;
95     private final SchemaResourceManager resourceManager;
96
97     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
98     private Registration rpcReg;
99     private String privateKeyPath;
100     private String privateKeyPassphrase;
101
102     public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
103                                   final DOMRpcProviderService rpcProviderRegistry,
104                                   final DOMActionProviderService actionProviderService,
105                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
106                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
107                                   final ActorSystemProvider actorSystemProvider,
108                                   final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
109                                   final String topologyId, final Config config,
110                                   final DOMMountPointService mountPointService,
111                                   final AAAEncryptionService encryptionService,
112                                   final RpcProviderService rpcProviderService,
113                                   final DeviceActionFactory deviceActionFactory,
114                                   final SchemaResourceManager resourceManager) {
115         this.baseSchemas = requireNonNull(baseSchemas);
116         this.dataBroker = requireNonNull(dataBroker);
117         this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
118         this.actionProviderRegistry = requireNonNull(actionProviderService);
119         this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
120         this.keepaliveExecutor = keepaliveExecutor.getExecutor();
121         this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
122         this.actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
123         this.eventExecutor = requireNonNull(eventExecutor);
124         this.clientDispatcher = requireNonNull(clientDispatcher);
125         this.topologyId = requireNonNull(topologyId);
126         this.writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
127         this.mountPointService = mountPointService;
128         this.encryptionService = requireNonNull(encryptionService);
129         this.rpcProviderService = requireNonNull(rpcProviderService);
130         this.deviceActionFactory = requireNonNull(deviceActionFactory);
131         this.resourceManager = requireNonNull(resourceManager);
132     }
133
134     // Blueprint init method
135     public void init() {
136         dataChangeListenerRegistration = registerDataTreeChangeListener();
137         rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
138             new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
139     }
140
141     @Override
142     public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
143         for (final DataTreeModification<Node> change : changes) {
144             final DataObjectModification<Node> rootNode = change.getRootNode();
145             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
146             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
147             switch (rootNode.getModificationType()) {
148                 case SUBTREE_MODIFIED:
149                     LOG.debug("Config for node {} updated", nodeId);
150                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
151                     break;
152                 case WRITE:
153                     if (contexts.containsKey(dataModifIdent)) {
154                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
155                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
156                     } else {
157                         LOG.debug("Config for node {} created", nodeId);
158                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
159                     }
160                     break;
161                 case DELETE:
162                     LOG.debug("Config for node {} deleted", nodeId);
163                     stopNetconfDeviceContext(dataModifIdent);
164                     break;
165                 default:
166                     LOG.warn("Unknown operation for {}.", nodeId);
167             }
168         }
169     }
170
171     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
172         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
173         context.refresh(createSetup(instanceIdentifier, node));
174     }
175
176     // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
177     // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
178     // retry registration several times and log the error.
179     // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
180     @SuppressWarnings("checkstyle:IllegalCatch")
181     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
182         final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
183         requireNonNull(netconfNode);
184         requireNonNull(netconfNode.getHost());
185         requireNonNull(netconfNode.getHost().getIpAddress());
186
187         final Timeout actorResponseWaitTime = Timeout.create(
188                 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
189
190         final ServiceGroupIdentifier serviceGroupIdent =
191                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
192
193         final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
194             createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
195
196         int tries = 3;
197         while (true) {
198             try {
199                 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
200                         clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
201                 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
202                 contexts.put(instanceIdentifier, newNetconfTopologyContext);
203                 break;
204             } catch (final RuntimeException e) {
205                 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
206
207                 if (--tries <= 0) {
208                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
209                             newNetconfTopologyContext, e);
210                     close(newNetconfTopologyContext);
211                     break;
212                 }
213             }
214         }
215     }
216
217     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
218         final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
219         if (netconfTopologyContext != null) {
220             close(clusterRegistrations.remove(instanceIdentifier));
221             close(netconfTopologyContext);
222         }
223     }
224
225     @VisibleForTesting
226     protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
227             final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
228             final DeviceActionFactory deviceActionFact) {
229         return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService,
230             deviceActionFact);
231     }
232
233     @Override
234     public void close() {
235         if (rpcReg != null) {
236             rpcReg.close();
237             rpcReg = null;
238         }
239         if (dataChangeListenerRegistration != null) {
240             dataChangeListenerRegistration.close();
241             dataChangeListenerRegistration = null;
242         }
243
244         contexts.values().forEach(NetconfTopologyManager::close);
245         clusterRegistrations.values().forEach(NetconfTopologyManager::close);
246
247         contexts.clear();
248         clusterRegistrations.clear();
249     }
250
251     @SuppressWarnings("checkstyle:IllegalCatch")
252     private static void close(final AutoCloseable closeable) {
253         try {
254             closeable.close();
255         } catch (Exception e) {
256             LOG.warn("Error closing {}", closeable, e);
257         }
258     }
259
260     /**
261      * Sets the private key path from location specified in configuration file using blueprint.
262      */
263     public void setPrivateKeyPath(final String privateKeyPath) {
264         this.privateKeyPath = privateKeyPath;
265     }
266
267     /**
268      * Sets the private key passphrase from location specified in configuration file using blueprint.
269      */
270     public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
271         this.privateKeyPassphrase = privateKeyPassphrase;
272     }
273
274     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
275         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
276         initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
277         initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
278         wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
279             @Override
280             public void onSuccess(final CommitInfo result) {
281                 LOG.debug("topology initialization successful");
282             }
283
284             @Override
285             public void onFailure(final Throwable throwable) {
286                 LOG.error("Unable to initialize netconf-topology", throwable);
287             }
288         }, MoreExecutors.directExecutor());
289
290         LOG.debug("Registering datastore listener");
291         return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
292             NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
293     }
294
295     private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
296         final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
297         final InstanceIdentifier<NetworkTopology> networkTopologyId =
298                 InstanceIdentifier.builder(NetworkTopology.class).build();
299         wtx.merge(datastoreType, networkTopologyId, networkTopology);
300         final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
301         wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
302                 new TopologyKey(new TopologyId(topologyId))), topology);
303     }
304
305     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
306         final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
307         final RemoteDeviceId deviceId = NetconfTopologyUtils.createRemoteDeviceId(node.getNodeId(), netconfNode);
308
309         final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
310                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
311                 .setBaseSchemas(baseSchemas)
312                 .setDataBroker(dataBroker)
313                 .setInstanceIdentifier(instanceIdentifier)
314                 .setRpcProviderRegistry(rpcProviderRegistry)
315                 .setActionProviderRegistry(actionProviderRegistry)
316                 .setNode(node)
317                 .setActorSystem(actorSystem)
318                 .setEventExecutor(eventExecutor)
319                 .setKeepaliveExecutor(keepaliveExecutor)
320                 .setProcessingExecutor(processingExecutor)
321                 .setTopologyId(topologyId)
322                 .setNetconfClientDispatcher(clientDispatcher)
323                 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode, deviceId))
324                 .setIdleTimeout(writeTxIdleTimeout)
325                 .setPrivateKeyPath(privateKeyPath)
326                 .setPrivateKeyPassphrase(privateKeyPassphrase)
327                 .setEncryptionService(encryptionService);
328
329         return builder.build();
330     }
331 }