1568e43c60ced697a52799e4a7100e2d3e67000b
[netconf.git] / apps / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import io.netty.util.concurrent.EventExecutor;
18 import java.time.Duration;
19 import java.util.Collection;
20 import java.util.Map;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.Executor;
23 import java.util.concurrent.ScheduledExecutorService;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.RpcProviderService;
34 import org.opendaylight.mdsal.binding.api.WriteTransaction;
35 import org.opendaylight.mdsal.common.api.CommitInfo;
36 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
37 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
38 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
39 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
42 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
43 import org.opendaylight.netconf.client.NetconfClientDispatcher;
44 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
45 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
46 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
47 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
48 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
49 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
50 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
52 import org.opendaylight.netconf.topology.spi.NetconfClientConfigurationBuilderFactory;
53 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
54 import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeTopologyService;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.concepts.Registration;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70
71 public class NetconfTopologyManager
72         implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
73
74     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
75
76     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
77     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
78             clusterRegistrations = new ConcurrentHashMap<>();
79
80     private final BaseNetconfSchemas baseSchemas;
81     private final DataBroker dataBroker;
82     private final DOMRpcProviderService rpcProviderRegistry;
83     private final DOMActionProviderService actionProviderRegistry;
84     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
85     private final ScheduledExecutorService keepaliveExecutorService;
86     private final Executor processingExecutorService;
87     private final ActorSystem actorSystem;
88     private final EventExecutor eventExecutor;
89     private final NetconfClientDispatcher clientDispatcher;
90     private final String topologyId;
91     private final Duration writeTxIdleTimeout;
92     private final DOMMountPointService mountPointService;
93     private final AAAEncryptionService encryptionService;
94     private final RpcProviderService rpcProviderService;
95     private final DeviceActionFactory deviceActionFactory;
96     private final NetconfClientConfigurationBuilderFactory builderFactory;
97     private final SchemaResourceManager resourceManager;
98
99     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
100     private Registration rpcReg;
101
102
103     public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
104                                   final DOMRpcProviderService rpcProviderRegistry,
105                                   final DOMActionProviderService actionProviderService,
106                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
107                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
108                                   final ActorSystemProvider actorSystemProvider,
109                                   final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
110                                   final String topologyId, final Config config,
111                                   final DOMMountPointService mountPointService,
112                                   final AAAEncryptionService encryptionService,
113                                   final RpcProviderService rpcProviderService,
114                                   final DeviceActionFactory deviceActionFactory,
115                                   final SchemaResourceManager resourceManager,
116                                   final NetconfClientConfigurationBuilderFactory builderFactory) {
117         this.baseSchemas = requireNonNull(baseSchemas);
118         this.dataBroker = requireNonNull(dataBroker);
119         this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
120         actionProviderRegistry = requireNonNull(actionProviderService);
121         this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
122         keepaliveExecutorService = keepaliveExecutor.getExecutor();
123         processingExecutorService = processingExecutor.getExecutor();
124         actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
125         this.eventExecutor = requireNonNull(eventExecutor);
126         this.clientDispatcher = requireNonNull(clientDispatcher);
127         this.topologyId = requireNonNull(topologyId);
128         writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
129         this.mountPointService = mountPointService;
130         this.encryptionService = requireNonNull(encryptionService);
131         this.rpcProviderService = requireNonNull(rpcProviderService);
132         this.deviceActionFactory = requireNonNull(deviceActionFactory);
133         this.resourceManager = requireNonNull(resourceManager);
134         this.builderFactory = requireNonNull(builderFactory);
135     }
136
137     // Blueprint init method
138     public void init() {
139         dataChangeListenerRegistration = registerDataTreeChangeListener();
140         rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
141             new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
142     }
143
144     @Override
145     public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
146         for (final DataTreeModification<Node> change : changes) {
147             final DataObjectModification<Node> rootNode = change.getRootNode();
148             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
149             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
150             switch (rootNode.getModificationType()) {
151                 case SUBTREE_MODIFIED:
152                     LOG.debug("Config for node {} updated", nodeId);
153                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
154                     break;
155                 case WRITE:
156                     if (contexts.containsKey(dataModifIdent)) {
157                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
158                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
159                     } else {
160                         LOG.debug("Config for node {} created", nodeId);
161                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
162                     }
163                     break;
164                 case DELETE:
165                     LOG.debug("Config for node {} deleted", nodeId);
166                     stopNetconfDeviceContext(dataModifIdent);
167                     break;
168                 default:
169                     LOG.warn("Unknown operation for {}.", nodeId);
170             }
171         }
172     }
173
174     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
175         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
176         context.refresh(createSetup(instanceIdentifier, node));
177     }
178
179     // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
180     // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
181     // retry registration several times and log the error.
182     // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
183     @SuppressWarnings("checkstyle:IllegalCatch")
184     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
185         final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
186
187         final Timeout actorResponseWaitTime = Timeout.create(
188                 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
189
190         final ServiceGroupIdentifier serviceGroupIdent =
191                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
192
193         final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
194             createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
195
196         int tries = 3;
197         while (true) {
198             try {
199                 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
200                         clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
201                 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
202                 contexts.put(instanceIdentifier, newNetconfTopologyContext);
203                 break;
204             } catch (final RuntimeException e) {
205                 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
206
207                 if (--tries <= 0) {
208                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
209                             newNetconfTopologyContext, e);
210                     close(newNetconfTopologyContext);
211                     break;
212                 }
213             }
214         }
215     }
216
217     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
218         final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
219         if (netconfTopologyContext != null) {
220             close(clusterRegistrations.remove(instanceIdentifier));
221             close(netconfTopologyContext);
222         }
223     }
224
225     @VisibleForTesting
226     protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
227             final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
228             final DeviceActionFactory deviceActionFact) {
229         return new NetconfTopologyContext(resourceManager, mountPointService, builderFactory, deviceActionFactory,
230             actorResponseWaitTime, serviceGroupIdent, setup);
231     }
232
233     @Override
234     public void close() {
235         if (rpcReg != null) {
236             rpcReg.close();
237             rpcReg = null;
238         }
239         if (dataChangeListenerRegistration != null) {
240             dataChangeListenerRegistration.close();
241             dataChangeListenerRegistration = null;
242         }
243
244         contexts.values().forEach(NetconfTopologyManager::close);
245         clusterRegistrations.values().forEach(NetconfTopologyManager::close);
246
247         contexts.clear();
248         clusterRegistrations.clear();
249     }
250
251     @SuppressWarnings("checkstyle:IllegalCatch")
252     private static void close(final AutoCloseable closeable) {
253         try {
254             closeable.close();
255         } catch (Exception e) {
256             LOG.warn("Error closing {}", closeable, e);
257         }
258     }
259
260     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
261         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
262         // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
263         //        also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
264         //        oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
265         wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
266             .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
267             .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
268         wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
269             @Override
270             public void onSuccess(final CommitInfo result) {
271                 LOG.debug("topology initialization successful");
272             }
273
274             @Override
275             public void onFailure(final Throwable throwable) {
276                 LOG.error("Unable to initialize netconf-topology", throwable);
277             }
278         }, MoreExecutors.directExecutor());
279
280         LOG.debug("Registering datastore listener");
281         return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
282             NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
283     }
284
285     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
286         final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
287         final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
288
289         return NetconfTopologySetupBuilder.create()
290                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
291                 .setBaseSchemas(baseSchemas)
292                 .setDataBroker(dataBroker)
293                 .setInstanceIdentifier(instanceIdentifier)
294                 .setRpcProviderRegistry(rpcProviderRegistry)
295                 .setActionProviderRegistry(actionProviderRegistry)
296                 .setNode(node)
297                 .setActorSystem(actorSystem)
298                 .setEventExecutor(eventExecutor)
299                 .setKeepaliveExecutor(keepaliveExecutorService)
300                 .setProcessingExecutor(processingExecutorService)
301                 .setTopologyId(topologyId)
302                 .setNetconfClientDispatcher(clientDispatcher)
303                 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
304                     deviceId))
305                 .setIdleTimeout(writeTxIdleTimeout)
306                 .build();
307     }
308 }