Factor out SchemaResourceManager
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.util.Collection;
20 import java.util.Map;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.WriteTransaction;
34 import org.opendaylight.mdsal.common.api.CommitInfo;
35 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
36 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
37 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
38 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
39 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
41 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
42 import org.opendaylight.netconf.client.NetconfClientDispatcher;
43 import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
44 import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
45 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
46 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
47 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
48 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
49 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
55 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
56 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
60 import org.opendaylight.yangtools.concepts.ListenerRegistration;
61 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 import scala.concurrent.duration.Duration;
65
66 public class NetconfTopologyManager
67         implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
68
69     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
70
71     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
72     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
73             clusterRegistrations = new ConcurrentHashMap<>();
74
75     private final DataBroker dataBroker;
76     private final DOMRpcProviderService rpcProviderRegistry;
77     private final DOMActionProviderService actionProviderRegistry;
78     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
79     private final ScheduledExecutorService keepaliveExecutor;
80     private final ListeningExecutorService processingExecutor;
81     private final ActorSystem actorSystem;
82     private final EventExecutor eventExecutor;
83     private final NetconfClientDispatcher clientDispatcher;
84     private final String topologyId;
85     private final Duration writeTxIdleTimeout;
86     private final DOMMountPointService mountPointService;
87     private final AAAEncryptionService encryptionService;
88     private final DeviceActionFactory deviceActionFactory;
89     private final SchemaResourceManager resourceManager;
90     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
91     private String privateKeyPath;
92     private String privateKeyPassphrase;
93
94     public NetconfTopologyManager(final DataBroker dataBroker, final DOMRpcProviderService rpcProviderRegistry,
95                                   final DOMActionProviderService actionProviderService,
96                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
97                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
98                                   final ActorSystemProvider actorSystemProvider,
99                                   final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
100                                   final String topologyId, final Config config,
101                                   final DOMMountPointService mountPointService,
102                                   final AAAEncryptionService encryptionService,
103                                   final DeviceActionFactory deviceActionFactory,
104                                   final SchemaResourceManager resourceManager) {
105         this.dataBroker = requireNonNull(dataBroker);
106         this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
107         this.actionProviderRegistry = requireNonNull(actionProviderService);
108         this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
109         this.keepaliveExecutor = keepaliveExecutor.getExecutor();
110         this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
111         this.actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
112         this.eventExecutor = requireNonNull(eventExecutor);
113         this.clientDispatcher = requireNonNull(clientDispatcher);
114         this.topologyId = requireNonNull(topologyId);
115         this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout().toJava(), TimeUnit.SECONDS);
116         this.mountPointService = mountPointService;
117         this.encryptionService = requireNonNull(encryptionService);
118         this.deviceActionFactory = requireNonNull(deviceActionFactory);
119         this.resourceManager = requireNonNull(resourceManager);
120     }
121
122     // Blueprint init method
123     public void init() {
124         dataChangeListenerRegistration = registerDataTreeChangeListener();
125     }
126
127     @Override
128     public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
129         for (final DataTreeModification<Node> change : changes) {
130             final DataObjectModification<Node> rootNode = change.getRootNode();
131             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
132             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
133             switch (rootNode.getModificationType()) {
134                 case SUBTREE_MODIFIED:
135                     LOG.debug("Config for node {} updated", nodeId);
136                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
137                     break;
138                 case WRITE:
139                     if (contexts.containsKey(dataModifIdent)) {
140                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
141                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
142                     } else {
143                         LOG.debug("Config for node {} created", nodeId);
144                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
145                     }
146                     break;
147                 case DELETE:
148                     LOG.debug("Config for node {} deleted", nodeId);
149                     stopNetconfDeviceContext(dataModifIdent);
150                     break;
151                 default:
152                     LOG.warn("Unknown operation for {}.", nodeId);
153             }
154         }
155     }
156
157     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
158         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
159         context.refresh(createSetup(instanceIdentifier, node));
160     }
161
162     // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
163     // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
164     // retry registration several times and log the error.
165     // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
166     @SuppressWarnings("checkstyle:IllegalCatch")
167     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
168         final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
169         requireNonNull(netconfNode);
170         requireNonNull(netconfNode.getHost());
171         requireNonNull(netconfNode.getHost().getIpAddress());
172
173         final Timeout actorResponseWaitTime = new Timeout(
174                 Duration.create(netconfNode.getActorResponseWaitTime().toJava(), "seconds"));
175
176         final ServiceGroupIdentifier serviceGroupIdent =
177                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
178
179         final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
180             createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
181
182         int tries = 3;
183         while (true) {
184             try {
185                 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
186                         clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
187                 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
188                 contexts.put(instanceIdentifier, newNetconfTopologyContext);
189                 break;
190             } catch (final RuntimeException e) {
191                 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
192
193                 if (--tries <= 0) {
194                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
195                             newNetconfTopologyContext, e);
196                     close(newNetconfTopologyContext);
197                     break;
198                 }
199             }
200         }
201     }
202
203     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
204         final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
205         if (netconfTopologyContext != null) {
206             close(clusterRegistrations.remove(instanceIdentifier));
207             close(netconfTopologyContext);
208         }
209     }
210
211     @VisibleForTesting
212     protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
213             final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
214             final DeviceActionFactory deviceActionFact) {
215         return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService,
216             deviceActionFact);
217     }
218
219     @Override
220     public void close() {
221         if (dataChangeListenerRegistration != null) {
222             dataChangeListenerRegistration.close();
223             dataChangeListenerRegistration = null;
224         }
225
226         contexts.values().forEach(NetconfTopologyManager::close);
227         clusterRegistrations.values().forEach(NetconfTopologyManager::close);
228
229         contexts.clear();
230         clusterRegistrations.clear();
231     }
232
233     @SuppressWarnings("checkstyle:IllegalCatch")
234     private static void close(final AutoCloseable closeable) {
235         try {
236             closeable.close();
237         } catch (Exception e) {
238             LOG.warn("Error closing {}", closeable, e);
239         }
240     }
241
242     /**
243      * Sets the private key path from location specified in configuration file using blueprint.
244      */
245     public void setPrivateKeyPath(final String privateKeyPath) {
246         this.privateKeyPath = privateKeyPath;
247     }
248
249     /**
250      * Sets the private key passphrase from location specified in configuration file using blueprint.
251      */
252     public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
253         this.privateKeyPassphrase = privateKeyPassphrase;
254     }
255
256     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
257         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
258         initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
259         initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
260         wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
261             @Override
262             public void onSuccess(final CommitInfo result) {
263                 LOG.debug("topology initialization successful");
264             }
265
266             @Override
267             public void onFailure(final Throwable throwable) {
268                 LOG.error("Unable to initialize netconf-topology", throwable);
269             }
270         }, MoreExecutors.directExecutor());
271
272         LOG.debug("Registering datastore listener");
273         return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
274             NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
275     }
276
277     private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
278         final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
279         final InstanceIdentifier<NetworkTopology> networkTopologyId =
280                 InstanceIdentifier.builder(NetworkTopology.class).build();
281         wtx.merge(datastoreType, networkTopologyId, networkTopology);
282         final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
283         wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
284                 new TopologyKey(new TopologyId(topologyId))), topology);
285     }
286
287     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
288         final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
289         final RemoteDeviceId deviceId = NetconfTopologyUtils.createRemoteDeviceId(node.getNodeId(), netconfNode);
290
291         final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
292                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
293                 .setDataBroker(dataBroker)
294                 .setInstanceIdentifier(instanceIdentifier)
295                 .setRpcProviderRegistry(rpcProviderRegistry)
296                 .setActionProviderRegistry(actionProviderRegistry)
297                 .setNode(node)
298                 .setActorSystem(actorSystem)
299                 .setEventExecutor(eventExecutor)
300                 .setKeepaliveExecutor(keepaliveExecutor)
301                 .setProcessingExecutor(processingExecutor)
302                 .setTopologyId(topologyId)
303                 .setNetconfClientDispatcher(clientDispatcher)
304                 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode, deviceId))
305                 .setIdleTimeout(writeTxIdleTimeout)
306                 .setPrivateKeyPath(privateKeyPath)
307                 .setPrivateKeyPassphrase(privateKeyPassphrase)
308                 .setEncryptionService(encryptionService);
309
310         return builder.build();
311     }
312 }