Optimize DefaultNetconfKeystoreAdapter
[netconf.git] / apps / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.time.Duration;
20 import java.util.Collection;
21 import java.util.Map;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ScheduledExecutorService;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.RpcProviderService;
34 import org.opendaylight.mdsal.binding.api.WriteTransaction;
35 import org.opendaylight.mdsal.common.api.CommitInfo;
36 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
37 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
38 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
39 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
42 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
43 import org.opendaylight.netconf.client.NetconfClientDispatcher;
44 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
45 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
46 import org.opendaylight.netconf.client.mdsal.api.NetconfKeystoreAdapter;
47 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
48 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
49 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
50 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
52 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
53 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
54 import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeTopologyService;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.concepts.Registration;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70
71 public class NetconfTopologyManager
72         implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
73
74     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
75
76     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
77     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
78             clusterRegistrations = new ConcurrentHashMap<>();
79
80     private final BaseNetconfSchemas baseSchemas;
81     private final DataBroker dataBroker;
82     private final DOMRpcProviderService rpcProviderRegistry;
83     private final DOMActionProviderService actionProviderRegistry;
84     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
85     private final ScheduledExecutorService keepaliveExecutor;
86     private final ListeningExecutorService processingExecutor;
87     private final ActorSystem actorSystem;
88     private final EventExecutor eventExecutor;
89     private final NetconfClientDispatcher clientDispatcher;
90     private final String topologyId;
91     private final Duration writeTxIdleTimeout;
92     private final DOMMountPointService mountPointService;
93     private final AAAEncryptionService encryptionService;
94     private final RpcProviderService rpcProviderService;
95     private final DeviceActionFactory deviceActionFactory;
96     private final NetconfKeystoreAdapter keystoreAdapter;
97     private final SchemaResourceManager resourceManager;
98
99     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
100     private Registration rpcReg;
101
102     public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
103                                   final DOMRpcProviderService rpcProviderRegistry,
104                                   final DOMActionProviderService actionProviderService,
105                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
106                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
107                                   final ActorSystemProvider actorSystemProvider,
108                                   final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
109                                   final String topologyId, final Config config,
110                                   final DOMMountPointService mountPointService,
111                                   final AAAEncryptionService encryptionService,
112                                   final RpcProviderService rpcProviderService,
113                                   final DeviceActionFactory deviceActionFactory,
114                                   final SchemaResourceManager resourceManager,
115                                   final NetconfKeystoreAdapter keystoreAdapter) {
116         this.baseSchemas = requireNonNull(baseSchemas);
117         this.dataBroker = requireNonNull(dataBroker);
118         this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
119         actionProviderRegistry = requireNonNull(actionProviderService);
120         this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
121         this.keepaliveExecutor = keepaliveExecutor.getExecutor();
122         this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
123         actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
124         this.eventExecutor = requireNonNull(eventExecutor);
125         this.clientDispatcher = requireNonNull(clientDispatcher);
126         this.topologyId = requireNonNull(topologyId);
127         writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
128         this.mountPointService = mountPointService;
129         this.encryptionService = requireNonNull(encryptionService);
130         this.rpcProviderService = requireNonNull(rpcProviderService);
131         this.deviceActionFactory = requireNonNull(deviceActionFactory);
132         this.resourceManager = requireNonNull(resourceManager);
133         this.keystoreAdapter = requireNonNull(keystoreAdapter);
134     }
135
136     // Blueprint init method
137     public void init() {
138         dataChangeListenerRegistration = registerDataTreeChangeListener();
139         rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
140             new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
141     }
142
143     @Override
144     public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
145         for (final DataTreeModification<Node> change : changes) {
146             final DataObjectModification<Node> rootNode = change.getRootNode();
147             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
148             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
149             switch (rootNode.getModificationType()) {
150                 case SUBTREE_MODIFIED:
151                     LOG.debug("Config for node {} updated", nodeId);
152                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
153                     break;
154                 case WRITE:
155                     if (contexts.containsKey(dataModifIdent)) {
156                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
157                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
158                     } else {
159                         LOG.debug("Config for node {} created", nodeId);
160                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
161                     }
162                     break;
163                 case DELETE:
164                     LOG.debug("Config for node {} deleted", nodeId);
165                     stopNetconfDeviceContext(dataModifIdent);
166                     break;
167                 default:
168                     LOG.warn("Unknown operation for {}.", nodeId);
169             }
170         }
171     }
172
173     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
174         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
175         context.refresh(createSetup(instanceIdentifier, node));
176     }
177
178     // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
179     // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
180     // retry registration several times and log the error.
181     // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
182     @SuppressWarnings("checkstyle:IllegalCatch")
183     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
184         final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
185
186         final Timeout actorResponseWaitTime = Timeout.create(
187                 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
188
189         final ServiceGroupIdentifier serviceGroupIdent =
190                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
191
192         final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
193             createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
194
195         int tries = 3;
196         while (true) {
197             try {
198                 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
199                         clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
200                 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
201                 contexts.put(instanceIdentifier, newNetconfTopologyContext);
202                 break;
203             } catch (final RuntimeException e) {
204                 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
205
206                 if (--tries <= 0) {
207                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
208                             newNetconfTopologyContext, e);
209                     close(newNetconfTopologyContext);
210                     break;
211                 }
212             }
213         }
214     }
215
216     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
217         final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
218         if (netconfTopologyContext != null) {
219             close(clusterRegistrations.remove(instanceIdentifier));
220             close(netconfTopologyContext);
221         }
222     }
223
224     @VisibleForTesting
225     protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
226             final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
227             final DeviceActionFactory deviceActionFact) {
228         return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService,
229             deviceActionFact);
230     }
231
232     @Override
233     public void close() {
234         if (rpcReg != null) {
235             rpcReg.close();
236             rpcReg = null;
237         }
238         if (dataChangeListenerRegistration != null) {
239             dataChangeListenerRegistration.close();
240             dataChangeListenerRegistration = null;
241         }
242
243         contexts.values().forEach(NetconfTopologyManager::close);
244         clusterRegistrations.values().forEach(NetconfTopologyManager::close);
245
246         contexts.clear();
247         clusterRegistrations.clear();
248     }
249
250     @SuppressWarnings("checkstyle:IllegalCatch")
251     private static void close(final AutoCloseable closeable) {
252         try {
253             closeable.close();
254         } catch (Exception e) {
255             LOG.warn("Error closing {}", closeable, e);
256         }
257     }
258
259     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
260         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
261         // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
262         //        also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
263         //        oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
264         wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
265             .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
266             .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
267         wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
268             @Override
269             public void onSuccess(final CommitInfo result) {
270                 LOG.debug("topology initialization successful");
271             }
272
273             @Override
274             public void onFailure(final Throwable throwable) {
275                 LOG.error("Unable to initialize netconf-topology", throwable);
276             }
277         }, MoreExecutors.directExecutor());
278
279         LOG.debug("Registering datastore listener");
280         return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
281             NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
282     }
283
284     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
285         final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
286         final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
287
288         return NetconfTopologySetupBuilder.create()
289                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
290                 .setBaseSchemas(baseSchemas)
291                 .setDataBroker(dataBroker)
292                 .setInstanceIdentifier(instanceIdentifier)
293                 .setRpcProviderRegistry(rpcProviderRegistry)
294                 .setActionProviderRegistry(actionProviderRegistry)
295                 .setNode(node)
296                 .setActorSystem(actorSystem)
297                 .setEventExecutor(eventExecutor)
298                 .setKeepaliveExecutor(keepaliveExecutor)
299                 .setProcessingExecutor(processingExecutor)
300                 .setTopologyId(topologyId)
301                 .setNetconfClientDispatcher(clientDispatcher)
302                 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
303                     deviceId))
304                 .setIdleTimeout(writeTxIdleTimeout)
305                 .setEncryptionService(encryptionService)
306                 .setKeystoreAdapter(keystoreAdapter)
307                 .build();
308     }
309 }