Remove blueprint from netconf-topology-singleton
[netconf.git] / apps / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.time.Duration;
20 import java.util.Collection;
21 import java.util.Map;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.ScheduledExecutorService;
25 import javax.annotation.PreDestroy;
26 import javax.inject.Inject;
27 import javax.inject.Singleton;
28 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
29 import org.opendaylight.controller.cluster.ActorSystemProvider;
30 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
31 import org.opendaylight.controller.config.threadpool.ThreadPool;
32 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
33 import org.opendaylight.mdsal.binding.api.DataBroker;
34 import org.opendaylight.mdsal.binding.api.DataObjectModification;
35 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
36 import org.opendaylight.mdsal.binding.api.DataTreeModification;
37 import org.opendaylight.mdsal.binding.api.RpcProviderService;
38 import org.opendaylight.mdsal.binding.api.WriteTransaction;
39 import org.opendaylight.mdsal.common.api.CommitInfo;
40 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
41 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
42 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
43 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
44 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
45 import org.opendaylight.netconf.client.NetconfClientDispatcher;
46 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
47 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
48 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
49 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
50 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
52 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
53 import org.opendaylight.netconf.topology.spi.NetconfClientConfigurationBuilderFactory;
54 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
55 import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeTopologyService;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.concepts.Registration;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.opendaylight.yangtools.yang.common.Uint16;
69 import org.osgi.service.component.annotations.Activate;
70 import org.osgi.service.component.annotations.Component;
71 import org.osgi.service.component.annotations.Deactivate;
72 import org.osgi.service.component.annotations.Reference;
73 import org.osgi.service.metatype.annotations.AttributeDefinition;
74 import org.osgi.service.metatype.annotations.Designate;
75 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78
79 @Singleton
80 @Component(service = { }, configurationPid = "org.opendaylight.netconf.topology.singleton")
81 @Designate(ocd = NetconfTopologyManager.Configuration.class)
82 // Non-final for testing
83 public class NetconfTopologyManager implements ClusteredDataTreeChangeListener<Node>, AutoCloseable {
84     @ObjectClassDefinition
85     public @interface Configuration {
86         @AttributeDefinition(min = "1", description = "Name of the Network Topology instance to manage")
87         String topology$_$id() default "topology-netconf";
88
89         @AttributeDefinition(min = "0", max = "65535",
90             description = "Idle time in seconds after which write transaction is cancelled automatically. If 0, "
91                 + "automatic cancellation is turned off.")
92         int write$_$transaction$_$idle$_$timeout() default 0;
93     }
94
95     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
96
97     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
98     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
99             clusterRegistrations = new ConcurrentHashMap<>();
100
101     private final BaseNetconfSchemas baseSchemas;
102     private final DataBroker dataBroker;
103     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
104     private final ScheduledExecutorService keepaliveExecutor;
105     private final Executor processingExecutor;
106     private final ActorSystem actorSystem;
107     private final EventExecutor eventExecutor;
108     private final NetconfClientDispatcher clientDispatcher;
109     private final String topologyId;
110     private final Duration writeTxIdleTimeout;
111     private final DOMMountPointService mountPointService;
112     private final AAAEncryptionService encryptionService;
113     private final RpcProviderService rpcProviderService;
114     private final DeviceActionFactory deviceActionFactory;
115     private final NetconfClientConfigurationBuilderFactory builderFactory;
116     private final SchemaResourceManager resourceManager;
117
118     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
119     private Registration rpcReg;
120
121     @Activate
122     public NetconfTopologyManager(@Reference final BaseNetconfSchemas baseSchemas,
123             @Reference final DataBroker dataBroker,
124             @Reference final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
125             @Reference(target = "(type=global-netconf-ssh-scheduled-executor)")
126                 final ScheduledThreadPool keepaliveExecutor,
127             @Reference(target = "(type=global-netconf-processing-executor)") final ThreadPool processingExecutor,
128             @Reference final ActorSystemProvider actorSystemProvider,
129             @Reference(target = "(type=global-event-executor)") final EventExecutor eventExecutor,
130             @Reference(target = "(type=netconf-client-dispatcher)") final NetconfClientDispatcher clientDispatcher,
131             @Reference final DOMMountPointService mountPointService,
132             @Reference final AAAEncryptionService encryptionService,
133             @Reference final RpcProviderService rpcProviderService,
134             @Reference final DeviceActionFactory deviceActionFactory,
135             @Reference final SchemaResourceManager resourceManager,
136             @Reference final NetconfClientConfigurationBuilderFactory builderFactory,
137             final Configuration configuration) {
138         this(baseSchemas, dataBroker, clusterSingletonServiceProvider, keepaliveExecutor.getExecutor(),
139             processingExecutor.getExecutor(), actorSystemProvider.getActorSystem(), eventExecutor, clientDispatcher,
140             mountPointService, encryptionService, rpcProviderService, deviceActionFactory, resourceManager,
141             builderFactory, configuration.topology$_$id(),
142             Uint16.valueOf(configuration.write$_$transaction$_$idle$_$timeout()));
143     }
144
145     @Inject
146     public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
147             final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
148             final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
149             final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
150             final NetconfClientDispatcher clientDispatcher, final DOMMountPointService mountPointService,
151             final AAAEncryptionService encryptionService, final RpcProviderService rpcProviderService,
152             final DeviceActionFactory deviceActionFactory, final SchemaResourceManager resourceManager,
153             final NetconfClientConfigurationBuilderFactory builderFactory) {
154         this(baseSchemas, dataBroker, clusterSingletonServiceProvider, keepaliveExecutor.getExecutor(),
155             processingExecutor.getExecutor(), actorSystemProvider.getActorSystem(), eventExecutor, clientDispatcher,
156             mountPointService, encryptionService, rpcProviderService, deviceActionFactory, resourceManager,
157             builderFactory, NetconfNodeUtils.DEFAULT_TOPOLOGY_NAME, Uint16.ZERO);
158     }
159
160     @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR",
161         justification = "Non-final for mocking, but we register for DTCL and that leaks 'this'")
162     public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
163             final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
164             final ScheduledExecutorService keepaliveExecutor, final Executor processingExecutor,
165             final ActorSystem actorSystem, final EventExecutor eventExecutor,
166             final NetconfClientDispatcher clientDispatcher, final DOMMountPointService mountPointService,
167             final AAAEncryptionService encryptionService, final RpcProviderService rpcProviderService,
168             final DeviceActionFactory deviceActionFactory, final SchemaResourceManager resourceManager,
169             final NetconfClientConfigurationBuilderFactory builderFactory, final String topologyId,
170             final Uint16 writeTransactionIdleTimeout) {
171         this.baseSchemas = requireNonNull(baseSchemas);
172         this.dataBroker = requireNonNull(dataBroker);
173         this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
174         this.keepaliveExecutor = requireNonNull(keepaliveExecutor);
175         this.processingExecutor = requireNonNull(processingExecutor);
176         this.actorSystem = requireNonNull(actorSystem);
177         this.eventExecutor = requireNonNull(eventExecutor);
178         this.clientDispatcher = requireNonNull(clientDispatcher);
179         this.topologyId = requireNonNull(topologyId);
180         writeTxIdleTimeout = Duration.ofSeconds(writeTransactionIdleTimeout.toJava());
181         this.mountPointService = mountPointService;
182         this.encryptionService = requireNonNull(encryptionService);
183         this.rpcProviderService = requireNonNull(rpcProviderService);
184         this.deviceActionFactory = requireNonNull(deviceActionFactory);
185         this.resourceManager = requireNonNull(resourceManager);
186         this.builderFactory = requireNonNull(builderFactory);
187
188         dataChangeListenerRegistration = registerDataTreeChangeListener();
189         rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
190             new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
191     }
192
193     @Override
194     public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
195         for (final DataTreeModification<Node> change : changes) {
196             final DataObjectModification<Node> rootNode = change.getRootNode();
197             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
198             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
199             switch (rootNode.getModificationType()) {
200                 case SUBTREE_MODIFIED:
201                     LOG.debug("Config for node {} updated", nodeId);
202                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
203                     break;
204                 case WRITE:
205                     if (contexts.containsKey(dataModifIdent)) {
206                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
207                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
208                     } else {
209                         LOG.debug("Config for node {} created", nodeId);
210                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
211                     }
212                     break;
213                 case DELETE:
214                     LOG.debug("Config for node {} deleted", nodeId);
215                     stopNetconfDeviceContext(dataModifIdent);
216                     break;
217                 default:
218                     LOG.warn("Unknown operation for {}.", nodeId);
219             }
220         }
221     }
222
223     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
224         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
225         context.refresh(createSetup(instanceIdentifier, node));
226     }
227
228     // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
229     // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
230     // retry registration several times and log the error.
231     // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
232     @SuppressWarnings("checkstyle:IllegalCatch")
233     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
234         final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
235
236         final Timeout actorResponseWaitTime = Timeout.create(
237                 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
238
239         final ServiceGroupIdentifier serviceGroupIdent =
240                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
241
242         final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
243             createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
244
245         int tries = 3;
246         while (true) {
247             try {
248                 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
249                         clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
250                 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
251                 contexts.put(instanceIdentifier, newNetconfTopologyContext);
252                 break;
253             } catch (final RuntimeException e) {
254                 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
255
256                 if (--tries <= 0) {
257                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
258                             newNetconfTopologyContext, e);
259                     close(newNetconfTopologyContext);
260                     break;
261                 }
262             }
263         }
264     }
265
266     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
267         final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
268         if (netconfTopologyContext != null) {
269             close(clusterRegistrations.remove(instanceIdentifier));
270             close(netconfTopologyContext);
271         }
272     }
273
274     @VisibleForTesting
275     protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
276             final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
277             final DeviceActionFactory deviceActionFact) {
278         return new NetconfTopologyContext(resourceManager, mountPointService, builderFactory, deviceActionFactory,
279             actorResponseWaitTime, serviceGroupIdent, setup);
280     }
281
282     @PreDestroy
283     @Deactivate
284     @Override
285     public void close() {
286         if (rpcReg != null) {
287             rpcReg.close();
288             rpcReg = null;
289         }
290         if (dataChangeListenerRegistration != null) {
291             dataChangeListenerRegistration.close();
292             dataChangeListenerRegistration = null;
293         }
294
295         contexts.values().forEach(NetconfTopologyManager::close);
296         clusterRegistrations.values().forEach(NetconfTopologyManager::close);
297
298         contexts.clear();
299         clusterRegistrations.clear();
300     }
301
302     @SuppressWarnings("checkstyle:IllegalCatch")
303     private static void close(final AutoCloseable closeable) {
304         try {
305             closeable.close();
306         } catch (Exception e) {
307             LOG.warn("Error closing {}", closeable, e);
308         }
309     }
310
311     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
312         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
313         // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
314         //        also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
315         //        oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
316         wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
317             .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
318             .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
319         wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
320             @Override
321             public void onSuccess(final CommitInfo result) {
322                 LOG.debug("topology initialization successful");
323             }
324
325             @Override
326             public void onFailure(final Throwable throwable) {
327                 LOG.error("Unable to initialize netconf-topology", throwable);
328             }
329         }, MoreExecutors.directExecutor());
330
331         LOG.debug("Registering datastore listener");
332         return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
333             NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
334     }
335
336     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
337         final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
338         final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
339
340         return NetconfTopologySetupBuilder.create()
341                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
342                 .setBaseSchemas(baseSchemas)
343                 .setDataBroker(dataBroker)
344                 .setInstanceIdentifier(instanceIdentifier)
345                 .setNode(node)
346                 .setActorSystem(actorSystem)
347                 .setEventExecutor(eventExecutor)
348                 .setKeepaliveExecutor(keepaliveExecutor)
349                 .setProcessingExecutor(processingExecutor)
350                 .setTopologyId(topologyId)
351                 .setNetconfClientDispatcher(clientDispatcher)
352                 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
353                     deviceId))
354                 .setIdleTimeout(writeTxIdleTimeout)
355                 .build();
356     }
357 }