Split out NetconfClientConfigurationBuilderFactory
[netconf.git] / apps / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.time.Duration;
20 import java.util.Collection;
21 import java.util.Map;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ScheduledExecutorService;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.RpcProviderService;
34 import org.opendaylight.mdsal.binding.api.WriteTransaction;
35 import org.opendaylight.mdsal.common.api.CommitInfo;
36 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
37 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
38 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
39 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
42 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
43 import org.opendaylight.netconf.client.NetconfClientDispatcher;
44 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
45 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
46 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
47 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
48 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
49 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
50 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
52 import org.opendaylight.netconf.topology.spi.NetconfClientConfigurationBuilderFactory;
53 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
54 import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeTopologyService;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.concepts.Registration;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70
71 public class NetconfTopologyManager
72         implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
73
74     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
75
76     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
77     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
78             clusterRegistrations = new ConcurrentHashMap<>();
79
80     private final BaseNetconfSchemas baseSchemas;
81     private final DataBroker dataBroker;
82     private final DOMRpcProviderService rpcProviderRegistry;
83     private final DOMActionProviderService actionProviderRegistry;
84     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
85     private final ScheduledThreadPool keepaliveExecutor;
86     private final ScheduledExecutorService keepaliveExecutorService;
87     private final ThreadPool processingExecutor;
88     private final ListeningExecutorService processingExecutorService;
89     private final ActorSystem actorSystem;
90     private final EventExecutor eventExecutor;
91     private final NetconfClientDispatcher clientDispatcher;
92     private final String topologyId;
93     private final Duration writeTxIdleTimeout;
94     private final DOMMountPointService mountPointService;
95     private final AAAEncryptionService encryptionService;
96     private final RpcProviderService rpcProviderService;
97     private final DeviceActionFactory deviceActionFactory;
98     private final NetconfClientConfigurationBuilderFactory builderFactory;
99     private final SchemaResourceManager resourceManager;
100
101     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
102     private Registration rpcReg;
103
104
105     public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
106                                   final DOMRpcProviderService rpcProviderRegistry,
107                                   final DOMActionProviderService actionProviderService,
108                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
109                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
110                                   final ActorSystemProvider actorSystemProvider,
111                                   final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
112                                   final String topologyId, final Config config,
113                                   final DOMMountPointService mountPointService,
114                                   final AAAEncryptionService encryptionService,
115                                   final RpcProviderService rpcProviderService,
116                                   final DeviceActionFactory deviceActionFactory,
117                                   final SchemaResourceManager resourceManager,
118                                   final NetconfClientConfigurationBuilderFactory builderFactory) {
119         this.baseSchemas = requireNonNull(baseSchemas);
120         this.dataBroker = requireNonNull(dataBroker);
121         this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
122         actionProviderRegistry = requireNonNull(actionProviderService);
123         this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
124         this.keepaliveExecutor = keepaliveExecutor;
125         keepaliveExecutorService = keepaliveExecutor.getExecutor();
126         this.processingExecutor = processingExecutor;
127         processingExecutorService = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
128         actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
129         this.eventExecutor = requireNonNull(eventExecutor);
130         this.clientDispatcher = requireNonNull(clientDispatcher);
131         this.topologyId = requireNonNull(topologyId);
132         writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
133         this.mountPointService = mountPointService;
134         this.encryptionService = requireNonNull(encryptionService);
135         this.rpcProviderService = requireNonNull(rpcProviderService);
136         this.deviceActionFactory = requireNonNull(deviceActionFactory);
137         this.resourceManager = requireNonNull(resourceManager);
138         this.builderFactory = requireNonNull(builderFactory);
139     }
140
141     // Blueprint init method
142     public void init() {
143         dataChangeListenerRegistration = registerDataTreeChangeListener();
144         rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
145             new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
146     }
147
148     @Override
149     public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
150         for (final DataTreeModification<Node> change : changes) {
151             final DataObjectModification<Node> rootNode = change.getRootNode();
152             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
153             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
154             switch (rootNode.getModificationType()) {
155                 case SUBTREE_MODIFIED:
156                     LOG.debug("Config for node {} updated", nodeId);
157                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
158                     break;
159                 case WRITE:
160                     if (contexts.containsKey(dataModifIdent)) {
161                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
162                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
163                     } else {
164                         LOG.debug("Config for node {} created", nodeId);
165                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
166                     }
167                     break;
168                 case DELETE:
169                     LOG.debug("Config for node {} deleted", nodeId);
170                     stopNetconfDeviceContext(dataModifIdent);
171                     break;
172                 default:
173                     LOG.warn("Unknown operation for {}.", nodeId);
174             }
175         }
176     }
177
178     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
179         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
180         context.refresh(createSetup(instanceIdentifier, node));
181     }
182
183     // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
184     // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
185     // retry registration several times and log the error.
186     // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
187     @SuppressWarnings("checkstyle:IllegalCatch")
188     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
189         final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
190
191         final Timeout actorResponseWaitTime = Timeout.create(
192                 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
193
194         final ServiceGroupIdentifier serviceGroupIdent =
195                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
196
197         final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
198             createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
199
200         int tries = 3;
201         while (true) {
202             try {
203                 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
204                         clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
205                 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
206                 contexts.put(instanceIdentifier, newNetconfTopologyContext);
207                 break;
208             } catch (final RuntimeException e) {
209                 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
210
211                 if (--tries <= 0) {
212                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
213                             newNetconfTopologyContext, e);
214                     close(newNetconfTopologyContext);
215                     break;
216                 }
217             }
218         }
219     }
220
221     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
222         final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
223         if (netconfTopologyContext != null) {
224             close(clusterRegistrations.remove(instanceIdentifier));
225             close(netconfTopologyContext);
226         }
227     }
228
229     @VisibleForTesting
230     protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
231             final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
232             final DeviceActionFactory deviceActionFact) {
233         return new NetconfTopologyContext(setup.getTopologyId(), setup.getNetconfClientDispatcher(),
234                 setup.getEventExecutor(), keepaliveExecutor,
235                 processingExecutor, resourceManager,
236                 dataBroker, mountPointService,
237                 builderFactory, deviceActionFactory,
238                 baseSchemas, actorResponseWaitTime,
239                 serviceGroupIdent, setup);
240     }
241
242     @Override
243     public void close() {
244         if (rpcReg != null) {
245             rpcReg.close();
246             rpcReg = null;
247         }
248         if (dataChangeListenerRegistration != null) {
249             dataChangeListenerRegistration.close();
250             dataChangeListenerRegistration = null;
251         }
252
253         contexts.values().forEach(NetconfTopologyManager::close);
254         clusterRegistrations.values().forEach(NetconfTopologyManager::close);
255
256         contexts.clear();
257         clusterRegistrations.clear();
258     }
259
260     @SuppressWarnings("checkstyle:IllegalCatch")
261     private static void close(final AutoCloseable closeable) {
262         try {
263             closeable.close();
264         } catch (Exception e) {
265             LOG.warn("Error closing {}", closeable, e);
266         }
267     }
268
269     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
270         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
271         // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
272         //        also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
273         //        oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
274         wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
275             .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
276             .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
277         wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
278             @Override
279             public void onSuccess(final CommitInfo result) {
280                 LOG.debug("topology initialization successful");
281             }
282
283             @Override
284             public void onFailure(final Throwable throwable) {
285                 LOG.error("Unable to initialize netconf-topology", throwable);
286             }
287         }, MoreExecutors.directExecutor());
288
289         LOG.debug("Registering datastore listener");
290         return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
291             NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
292     }
293
294     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
295         final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
296         final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
297
298         return NetconfTopologySetupBuilder.create()
299                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
300                 .setBaseSchemas(baseSchemas)
301                 .setDataBroker(dataBroker)
302                 .setInstanceIdentifier(instanceIdentifier)
303                 .setRpcProviderRegistry(rpcProviderRegistry)
304                 .setActionProviderRegistry(actionProviderRegistry)
305                 .setNode(node)
306                 .setActorSystem(actorSystem)
307                 .setEventExecutor(eventExecutor)
308                 .setKeepaliveExecutor(keepaliveExecutorService)
309                 .setProcessingExecutor(processingExecutorService)
310                 .setTopologyId(topologyId)
311                 .setNetconfClientDispatcher(clientDispatcher)
312                 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
313                     deviceId))
314                 .setIdleTimeout(writeTxIdleTimeout)
315                 .build();
316     }
317 }