Migrate netconf-topology to new transport
[netconf.git] / apps / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.time.Duration;
20 import java.util.Collection;
21 import java.util.Map;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.ScheduledExecutorService;
25 import javax.annotation.PreDestroy;
26 import javax.inject.Inject;
27 import javax.inject.Singleton;
28 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
29 import org.opendaylight.controller.cluster.ActorSystemProvider;
30 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
31 import org.opendaylight.controller.config.threadpool.ThreadPool;
32 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
33 import org.opendaylight.mdsal.binding.api.DataBroker;
34 import org.opendaylight.mdsal.binding.api.DataObjectModification;
35 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
36 import org.opendaylight.mdsal.binding.api.DataTreeModification;
37 import org.opendaylight.mdsal.binding.api.RpcProviderService;
38 import org.opendaylight.mdsal.binding.api.WriteTransaction;
39 import org.opendaylight.mdsal.common.api.CommitInfo;
40 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
41 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
42 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
43 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
44 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
45 import org.opendaylight.netconf.client.NetconfClientFactory;
46 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
47 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
48 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
49 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
50 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
52 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
53 import org.opendaylight.netconf.topology.spi.NetconfClientConfigurationBuilderFactory;
54 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
55 import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
64 import org.opendaylight.yangtools.concepts.ListenerRegistration;
65 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
66 import org.opendaylight.yangtools.yang.common.Uint16;
67 import org.osgi.service.component.annotations.Activate;
68 import org.osgi.service.component.annotations.Component;
69 import org.osgi.service.component.annotations.Deactivate;
70 import org.osgi.service.component.annotations.Reference;
71 import org.osgi.service.metatype.annotations.AttributeDefinition;
72 import org.osgi.service.metatype.annotations.Designate;
73 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77 @Singleton
78 @Component(service = { }, configurationPid = "org.opendaylight.netconf.topology.singleton")
79 @Designate(ocd = NetconfTopologyManager.Configuration.class)
80 // Non-final for testing
81 public class NetconfTopologyManager implements ClusteredDataTreeChangeListener<Node>, AutoCloseable {
82     @ObjectClassDefinition
83     public @interface Configuration {
84         @AttributeDefinition(min = "1", description = "Name of the Network Topology instance to manage")
85         String topology$_$id() default "topology-netconf";
86
87         @AttributeDefinition(min = "0", max = "65535",
88             description = "Idle time in seconds after which write transaction is cancelled automatically. If 0, "
89                 + "automatic cancellation is turned off.")
90         int write$_$transaction$_$idle$_$timeout() default 0;
91     }
92
93     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
94
95     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
96     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
97             clusterRegistrations = new ConcurrentHashMap<>();
98
99     private final BaseNetconfSchemas baseSchemas;
100     private final DataBroker dataBroker;
101     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
102     private final ScheduledExecutorService keepaliveExecutor;
103     private final Executor processingExecutor;
104     private final ActorSystem actorSystem;
105     private final EventExecutor eventExecutor;
106     private final NetconfClientFactory clientFactory;
107     private final String topologyId;
108     private final Duration writeTxIdleTimeout;
109     private final DOMMountPointService mountPointService;
110     private final DeviceActionFactory deviceActionFactory;
111     private final NetconfClientConfigurationBuilderFactory builderFactory;
112     private final SchemaResourceManager resourceManager;
113
114     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
115     private NetconfTopologyRPCProvider rpcProvider;
116
117     @Activate
118     public NetconfTopologyManager(@Reference final BaseNetconfSchemas baseSchemas,
119             @Reference final DataBroker dataBroker,
120             @Reference final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
121             @Reference(target = "(type=global-netconf-ssh-scheduled-executor)")
122                 final ScheduledThreadPool keepaliveExecutor,
123             @Reference(target = "(type=global-netconf-processing-executor)") final ThreadPool processingExecutor,
124             @Reference final ActorSystemProvider actorSystemProvider,
125             @Reference(target = "(type=global-event-executor)") final EventExecutor eventExecutor,
126             @Reference(target = "(type=netconf-client-factory)") final NetconfClientFactory clientFactory,
127             @Reference final DOMMountPointService mountPointService,
128             @Reference final AAAEncryptionService encryptionService,
129             @Reference final RpcProviderService rpcProviderService,
130             @Reference final DeviceActionFactory deviceActionFactory,
131             @Reference final SchemaResourceManager resourceManager,
132             @Reference final NetconfClientConfigurationBuilderFactory builderFactory,
133             final Configuration configuration) {
134         this(baseSchemas, dataBroker, clusterSingletonServiceProvider, keepaliveExecutor.getExecutor(),
135             processingExecutor.getExecutor(), actorSystemProvider.getActorSystem(), eventExecutor, clientFactory,
136             mountPointService, encryptionService, rpcProviderService, deviceActionFactory, resourceManager,
137             builderFactory, configuration.topology$_$id(),
138             Uint16.valueOf(configuration.write$_$transaction$_$idle$_$timeout()));
139     }
140
141     @Inject
142     public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
143             final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
144             final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
145             final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
146             final NetconfClientFactory clientFactory, final DOMMountPointService mountPointService,
147             final AAAEncryptionService encryptionService, final RpcProviderService rpcProviderService,
148             final DeviceActionFactory deviceActionFactory, final SchemaResourceManager resourceManager,
149             final NetconfClientConfigurationBuilderFactory builderFactory) {
150         this(baseSchemas, dataBroker, clusterSingletonServiceProvider, keepaliveExecutor.getExecutor(),
151             processingExecutor.getExecutor(), actorSystemProvider.getActorSystem(), eventExecutor, clientFactory,
152             mountPointService, encryptionService, rpcProviderService, deviceActionFactory, resourceManager,
153             builderFactory, NetconfNodeUtils.DEFAULT_TOPOLOGY_NAME, Uint16.ZERO);
154     }
155
156     @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR",
157         justification = "Non-final for mocking, but we register for DTCL and that leaks 'this'")
158     public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
159             final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
160             final ScheduledExecutorService keepaliveExecutor, final Executor processingExecutor,
161             final ActorSystem actorSystem, final EventExecutor eventExecutor,
162             final NetconfClientFactory clientFactory, final DOMMountPointService mountPointService,
163             final AAAEncryptionService encryptionService, final RpcProviderService rpcProviderService,
164             final DeviceActionFactory deviceActionFactory, final SchemaResourceManager resourceManager,
165             final NetconfClientConfigurationBuilderFactory builderFactory, final String topologyId,
166             final Uint16 writeTransactionIdleTimeout) {
167         this.baseSchemas = requireNonNull(baseSchemas);
168         this.dataBroker = requireNonNull(dataBroker);
169         this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
170         this.keepaliveExecutor = requireNonNull(keepaliveExecutor);
171         this.processingExecutor = requireNonNull(processingExecutor);
172         this.actorSystem = requireNonNull(actorSystem);
173         this.eventExecutor = requireNonNull(eventExecutor);
174         this.clientFactory = requireNonNull(clientFactory);
175         this.topologyId = requireNonNull(topologyId);
176         writeTxIdleTimeout = Duration.ofSeconds(writeTransactionIdleTimeout.toJava());
177         this.mountPointService = mountPointService;
178         this.deviceActionFactory = requireNonNull(deviceActionFactory);
179         this.resourceManager = requireNonNull(resourceManager);
180         this.builderFactory = requireNonNull(builderFactory);
181
182         dataChangeListenerRegistration = registerDataTreeChangeListener();
183         rpcProvider = new NetconfTopologyRPCProvider(rpcProviderService, dataBroker, encryptionService, topologyId);
184     }
185
186     @Override
187     public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
188         for (final DataTreeModification<Node> change : changes) {
189             final DataObjectModification<Node> rootNode = change.getRootNode();
190             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
191             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
192             switch (rootNode.getModificationType()) {
193                 case SUBTREE_MODIFIED:
194                     LOG.debug("Config for node {} updated", nodeId);
195                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
196                     break;
197                 case WRITE:
198                     if (contexts.containsKey(dataModifIdent)) {
199                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
200                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
201                     } else {
202                         LOG.debug("Config for node {} created", nodeId);
203                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
204                     }
205                     break;
206                 case DELETE:
207                     LOG.debug("Config for node {} deleted", nodeId);
208                     stopNetconfDeviceContext(dataModifIdent);
209                     break;
210                 default:
211                     LOG.warn("Unknown operation for {}.", nodeId);
212             }
213         }
214     }
215
216     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
217         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
218         context.refresh(createSetup(instanceIdentifier, node));
219     }
220
221     // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
222     // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
223     // retry registration several times and log the error.
224     // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
225     @SuppressWarnings("checkstyle:IllegalCatch")
226     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
227         final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
228
229         final Timeout actorResponseWaitTime = Timeout.create(
230                 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
231
232         final ServiceGroupIdentifier serviceGroupIdent =
233                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
234
235         final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
236             createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
237
238         int tries = 3;
239         while (true) {
240             try {
241                 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
242                         clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
243                 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
244                 contexts.put(instanceIdentifier, newNetconfTopologyContext);
245                 break;
246             } catch (final RuntimeException e) {
247                 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
248
249                 if (--tries <= 0) {
250                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
251                             newNetconfTopologyContext, e);
252                     close(newNetconfTopologyContext);
253                     break;
254                 }
255             }
256         }
257     }
258
259     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
260         final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
261         if (netconfTopologyContext != null) {
262             close(clusterRegistrations.remove(instanceIdentifier));
263             close(netconfTopologyContext);
264         }
265     }
266
267     @VisibleForTesting
268     protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
269             final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
270             final DeviceActionFactory deviceActionFact) {
271         return new NetconfTopologyContext(resourceManager, mountPointService, builderFactory, deviceActionFactory,
272             actorResponseWaitTime, serviceGroupIdent, setup);
273     }
274
275     @PreDestroy
276     @Deactivate
277     @Override
278     public void close() {
279         if (rpcProvider != null) {
280             rpcProvider.close();
281             rpcProvider = null;
282         }
283         if (dataChangeListenerRegistration != null) {
284             dataChangeListenerRegistration.close();
285             dataChangeListenerRegistration = null;
286         }
287
288         contexts.values().forEach(NetconfTopologyManager::close);
289         clusterRegistrations.values().forEach(NetconfTopologyManager::close);
290
291         contexts.clear();
292         clusterRegistrations.clear();
293     }
294
295     @SuppressWarnings("checkstyle:IllegalCatch")
296     private static void close(final AutoCloseable closeable) {
297         try {
298             closeable.close();
299         } catch (Exception e) {
300             LOG.warn("Error closing {}", closeable, e);
301         }
302     }
303
304     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
305         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
306         // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
307         //        also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
308         //        oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
309         wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
310             .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
311             .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
312         wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
313             @Override
314             public void onSuccess(final CommitInfo result) {
315                 LOG.debug("topology initialization successful");
316             }
317
318             @Override
319             public void onFailure(final Throwable throwable) {
320                 LOG.error("Unable to initialize netconf-topology", throwable);
321             }
322         }, MoreExecutors.directExecutor());
323
324         LOG.debug("Registering datastore listener");
325         return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
326             NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
327     }
328
329     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
330         final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
331         final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
332
333         return NetconfTopologySetupBuilder.create()
334                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
335                 .setBaseSchemas(baseSchemas)
336                 .setDataBroker(dataBroker)
337                 .setInstanceIdentifier(instanceIdentifier)
338                 .setNode(node)
339                 .setActorSystem(actorSystem)
340                 .setEventExecutor(eventExecutor)
341                 .setKeepaliveExecutor(keepaliveExecutor)
342                 .setProcessingExecutor(processingExecutor)
343                 .setTopologyId(topologyId)
344                 .setNetconfClientFactory(clientFactory)
345                 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
346                     deviceId))
347                 .setIdleTimeout(writeTxIdleTimeout)
348                 .build();
349     }
350 }