Add session-id to the operational datastore
[netconf.git] / apps / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.time.Duration;
20 import java.util.Collection;
21 import java.util.Map;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ScheduledExecutorService;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.RpcProviderService;
34 import org.opendaylight.mdsal.binding.api.WriteTransaction;
35 import org.opendaylight.mdsal.common.api.CommitInfo;
36 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
37 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
38 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
39 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
42 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
43 import org.opendaylight.netconf.client.NetconfClientDispatcher;
44 import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
45 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceId;
46 import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
47 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseNetconfSchemas;
48 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
49 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
50 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
52 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
53 import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeTopologyService;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
64 import org.opendaylight.yangtools.concepts.ListenerRegistration;
65 import org.opendaylight.yangtools.concepts.Registration;
66 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 public class NetconfTopologyManager
71         implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
72
73     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
74
75     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
76     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
77             clusterRegistrations = new ConcurrentHashMap<>();
78
79     private final BaseNetconfSchemas baseSchemas;
80     private final DataBroker dataBroker;
81     private final DOMRpcProviderService rpcProviderRegistry;
82     private final DOMActionProviderService actionProviderRegistry;
83     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
84     private final ScheduledExecutorService keepaliveExecutor;
85     private final ListeningExecutorService processingExecutor;
86     private final ActorSystem actorSystem;
87     private final EventExecutor eventExecutor;
88     private final NetconfClientDispatcher clientDispatcher;
89     private final String topologyId;
90     private final Duration writeTxIdleTimeout;
91     private final DOMMountPointService mountPointService;
92     private final AAAEncryptionService encryptionService;
93     private final RpcProviderService rpcProviderService;
94     private final DeviceActionFactory deviceActionFactory;
95     private final SchemaResourceManager resourceManager;
96
97     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
98     private Registration rpcReg;
99     private String privateKeyPath;
100     private String privateKeyPassphrase;
101
102     public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
103                                   final DOMRpcProviderService rpcProviderRegistry,
104                                   final DOMActionProviderService actionProviderService,
105                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
106                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
107                                   final ActorSystemProvider actorSystemProvider,
108                                   final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
109                                   final String topologyId, final Config config,
110                                   final DOMMountPointService mountPointService,
111                                   final AAAEncryptionService encryptionService,
112                                   final RpcProviderService rpcProviderService,
113                                   final DeviceActionFactory deviceActionFactory,
114                                   final SchemaResourceManager resourceManager) {
115         this.baseSchemas = requireNonNull(baseSchemas);
116         this.dataBroker = requireNonNull(dataBroker);
117         this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
118         actionProviderRegistry = requireNonNull(actionProviderService);
119         this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
120         this.keepaliveExecutor = keepaliveExecutor.getExecutor();
121         this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
122         actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
123         this.eventExecutor = requireNonNull(eventExecutor);
124         this.clientDispatcher = requireNonNull(clientDispatcher);
125         this.topologyId = requireNonNull(topologyId);
126         writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
127         this.mountPointService = mountPointService;
128         this.encryptionService = requireNonNull(encryptionService);
129         this.rpcProviderService = requireNonNull(rpcProviderService);
130         this.deviceActionFactory = requireNonNull(deviceActionFactory);
131         this.resourceManager = requireNonNull(resourceManager);
132     }
133
134     // Blueprint init method
135     public void init() {
136         dataChangeListenerRegistration = registerDataTreeChangeListener();
137         rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
138             new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
139     }
140
141     @Override
142     public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
143         for (final DataTreeModification<Node> change : changes) {
144             final DataObjectModification<Node> rootNode = change.getRootNode();
145             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
146             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
147             switch (rootNode.getModificationType()) {
148                 case SUBTREE_MODIFIED:
149                     LOG.debug("Config for node {} updated", nodeId);
150                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
151                     break;
152                 case WRITE:
153                     if (contexts.containsKey(dataModifIdent)) {
154                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
155                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
156                     } else {
157                         LOG.debug("Config for node {} created", nodeId);
158                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
159                     }
160                     break;
161                 case DELETE:
162                     LOG.debug("Config for node {} deleted", nodeId);
163                     stopNetconfDeviceContext(dataModifIdent);
164                     break;
165                 default:
166                     LOG.warn("Unknown operation for {}.", nodeId);
167             }
168         }
169     }
170
171     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
172         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
173         context.refresh(createSetup(instanceIdentifier, node));
174     }
175
176     // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
177     // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
178     // retry registration several times and log the error.
179     // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
180     @SuppressWarnings("checkstyle:IllegalCatch")
181     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
182         final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
183
184         final Timeout actorResponseWaitTime = Timeout.create(
185                 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
186
187         final ServiceGroupIdentifier serviceGroupIdent =
188                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
189
190         final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
191             createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
192
193         int tries = 3;
194         while (true) {
195             try {
196                 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
197                         clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
198                 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
199                 contexts.put(instanceIdentifier, newNetconfTopologyContext);
200                 break;
201             } catch (final RuntimeException e) {
202                 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
203
204                 if (--tries <= 0) {
205                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
206                             newNetconfTopologyContext, e);
207                     close(newNetconfTopologyContext);
208                     break;
209                 }
210             }
211         }
212     }
213
214     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
215         final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
216         if (netconfTopologyContext != null) {
217             close(clusterRegistrations.remove(instanceIdentifier));
218             close(netconfTopologyContext);
219         }
220     }
221
222     @VisibleForTesting
223     protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
224             final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
225             final DeviceActionFactory deviceActionFact) {
226         return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService,
227             deviceActionFact);
228     }
229
230     @Override
231     public void close() {
232         if (rpcReg != null) {
233             rpcReg.close();
234             rpcReg = null;
235         }
236         if (dataChangeListenerRegistration != null) {
237             dataChangeListenerRegistration.close();
238             dataChangeListenerRegistration = null;
239         }
240
241         contexts.values().forEach(NetconfTopologyManager::close);
242         clusterRegistrations.values().forEach(NetconfTopologyManager::close);
243
244         contexts.clear();
245         clusterRegistrations.clear();
246     }
247
248     @SuppressWarnings("checkstyle:IllegalCatch")
249     private static void close(final AutoCloseable closeable) {
250         try {
251             closeable.close();
252         } catch (Exception e) {
253             LOG.warn("Error closing {}", closeable, e);
254         }
255     }
256
257     /**
258      * Sets the private key path from location specified in configuration file using blueprint.
259      */
260     public void setPrivateKeyPath(final String privateKeyPath) {
261         this.privateKeyPath = privateKeyPath;
262     }
263
264     /**
265      * Sets the private key passphrase from location specified in configuration file using blueprint.
266      */
267     public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
268         this.privateKeyPassphrase = privateKeyPassphrase;
269     }
270
271     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
272         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
273         // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
274         //        also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
275         //        oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
276         wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
277             .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
278             .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
279         wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
280             @Override
281             public void onSuccess(final CommitInfo result) {
282                 LOG.debug("topology initialization successful");
283             }
284
285             @Override
286             public void onFailure(final Throwable throwable) {
287                 LOG.error("Unable to initialize netconf-topology", throwable);
288             }
289         }, MoreExecutors.directExecutor());
290
291         LOG.debug("Registering datastore listener");
292         return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
293             NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
294     }
295
296     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
297         final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
298         final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
299
300         return NetconfTopologySetupBuilder.create()
301                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
302                 .setBaseSchemas(baseSchemas)
303                 .setDataBroker(dataBroker)
304                 .setInstanceIdentifier(instanceIdentifier)
305                 .setRpcProviderRegistry(rpcProviderRegistry)
306                 .setActionProviderRegistry(actionProviderRegistry)
307                 .setNode(node)
308                 .setActorSystem(actorSystem)
309                 .setEventExecutor(eventExecutor)
310                 .setKeepaliveExecutor(keepaliveExecutor)
311                 .setProcessingExecutor(processingExecutor)
312                 .setTopologyId(topologyId)
313                 .setNetconfClientDispatcher(clientDispatcher)
314                 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
315                     deviceId))
316                 .setIdleTimeout(writeTxIdleTimeout)
317                 .setPrivateKeyPath(privateKeyPath)
318                 .setPrivateKeyPassphrase(privateKeyPassphrase)
319                 .setEncryptionService(encryptionService)
320                 .build();
321     }
322 }