Merge "Replace deprecated Futures.addCallback by the newer version"
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl;
10
11 import akka.actor.ActorSystem;
12 import akka.util.Timeout;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import io.netty.util.concurrent.EventExecutor;
18 import java.util.Collection;
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.concurrent.TimeUnit;
22 import javax.annotation.Nonnull;
23 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
24 import org.opendaylight.controller.cluster.ActorSystemProvider;
25 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
26 import org.opendaylight.controller.config.threadpool.ThreadPool;
27 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
30 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
31 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
35 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
37 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
38 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
39 import org.opendaylight.netconf.client.NetconfClientDispatcher;
40 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
41 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
42 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
43 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
54 import org.opendaylight.yangtools.concepts.ListenerRegistration;
55 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import scala.concurrent.duration.Duration;
59
60 public class NetconfTopologyManager
61         implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
62
63     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
64
65     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
66     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
67             clusterRegistrations = new HashMap<>();
68
69     private final DataBroker dataBroker;
70     private final RpcProviderRegistry rpcProviderRegistry;
71     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
72     private final ScheduledThreadPool keepaliveExecutor;
73     private final ThreadPool processingExecutor;
74     private final ActorSystem actorSystem;
75     private final EventExecutor eventExecutor;
76     private final NetconfClientDispatcher clientDispatcher;
77     private final String topologyId;
78     private final Duration writeTxIdleTimeout;
79     private final DOMMountPointService mountPointService;
80     private final AAAEncryptionService encryptionService;
81     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
82     private String privateKeyPath;
83     private String privateKeyPassphrase;
84
85     public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
86                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
87                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
88                                   final ActorSystemProvider actorSystemProvider,
89                                   final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
90                                   final String topologyId, final Config config,
91                                   final DOMMountPointService mountPointService,
92                                   final AAAEncryptionService encryptionService) {
93
94         this.dataBroker = Preconditions.checkNotNull(dataBroker);
95         this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
96         this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
97         this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
98         this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
99         this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
100         this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
101         this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
102         this.topologyId = Preconditions.checkNotNull(topologyId);
103         this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
104         this.mountPointService = mountPointService;
105         this.encryptionService = Preconditions.checkNotNull(encryptionService);
106
107     }
108
109     // Blueprint init method
110     public void init() {
111         dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId);
112     }
113
114     @Override
115     public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
116         for (final DataTreeModification<Node> change : changes) {
117             final DataObjectModification<Node> rootNode = change.getRootNode();
118             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
119             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
120             switch (rootNode.getModificationType()) {
121                 case SUBTREE_MODIFIED:
122                     LOG.debug("Config for node {} updated", nodeId);
123                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
124                     break;
125                 case WRITE:
126                     if (contexts.containsKey(dataModifIdent)) {
127                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
128                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
129                     } else {
130                         LOG.debug("Config for node {} created", nodeId);
131                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
132                     }
133                     break;
134                 case DELETE:
135                     LOG.debug("Config for node {} deleted", nodeId);
136                     stopNetconfDeviceContext(dataModifIdent);
137                     break;
138                 default:
139                     LOG.warn("Unknown operation for {}.", nodeId);
140             }
141         }
142     }
143
144     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
145         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
146         context.refresh(createSetup(instanceIdentifier, node));
147     }
148
149     // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
150     // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
151     // retry registration several times and log the error.
152     // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
153     @SuppressWarnings("checkstyle:IllegalCatch")
154     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
155         final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
156         Preconditions.checkNotNull(netconfNode);
157         Preconditions.checkNotNull(netconfNode.getHost());
158         Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
159
160         final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
161                 "seconds"));
162
163         final ServiceGroupIdentifier serviceGroupIdent =
164                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
165
166         final NetconfTopologyContext newNetconfTopologyContext =
167                 new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
168                         actorResponseWaitTime, mountPointService);
169
170         int tries = 3;
171         while (true) {
172             try {
173                 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
174                         clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
175                 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
176                 contexts.put(instanceIdentifier, newNetconfTopologyContext);
177                 break;
178             } catch (final RuntimeException e) {
179                 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
180
181                 if (--tries <= 0) {
182                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
183                             newNetconfTopologyContext, e);
184                     close();
185                     break;
186                 }
187             }
188         }
189
190     }
191
192     @SuppressWarnings("checkstyle:IllegalCatch")
193     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
194         if (contexts.containsKey(instanceIdentifier)) {
195             try {
196                 clusterRegistrations.get(instanceIdentifier).close();
197                 contexts.get(instanceIdentifier).closeFinal();
198             } catch (final Exception e) {
199                 LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
200             }
201             contexts.remove(instanceIdentifier);
202             clusterRegistrations.remove(instanceIdentifier);
203         }
204     }
205
206     @SuppressWarnings("checkstyle:IllegalCatch")
207     @Override
208     public void close() {
209         if (dataChangeListenerRegistration != null) {
210             dataChangeListenerRegistration.close();
211             dataChangeListenerRegistration = null;
212         }
213         contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
214             try {
215                 netconfTopologyContext.closeFinal();
216             } catch (final Exception e) {
217                 LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
218             }
219         });
220         clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
221             try {
222                 clusterSingletonServiceRegistration.close();
223             } catch (final Exception e) {
224                 LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
225             }
226         });
227         contexts.clear();
228         clusterRegistrations.clear();
229     }
230
231     /**
232      * Sets the private key path from location specified in configuration file using blueprint.
233      */
234     public void setPrivateKeyPath(String privateKeyPath) {
235         this.privateKeyPath = privateKeyPath;
236     }
237
238     /**
239      * Sets the private key passphrase from location specified in configuration file using blueprint.
240      */
241     public void setPrivateKeyPassphrase(String privateKeyPassphrase) {
242         this.privateKeyPassphrase = privateKeyPassphrase;
243     }
244
245     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
246         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
247         initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
248         initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
249         Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
250             @Override
251             public void onSuccess(final Void result) {
252                 LOG.debug("topology initialization successful");
253             }
254
255             @Override
256             public void onFailure(@Nonnull final Throwable throwable) {
257                 LOG.error("Unable to initialize netconf-topology, {}", throwable);
258             }
259         }, MoreExecutors.directExecutor());
260
261         LOG.debug("Registering datastore listener");
262         return dataBroker.registerDataTreeChangeListener(
263                 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
264                         NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
265     }
266
267     private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType,
268                               final String topologyId) {
269         final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
270         final InstanceIdentifier<NetworkTopology> networkTopologyId =
271                 InstanceIdentifier.builder(NetworkTopology.class).build();
272         wtx.merge(datastoreType, networkTopologyId, networkTopology);
273         final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
274         wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
275                 new TopologyKey(new TopologyId(topologyId))), topology);
276     }
277
278     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
279         final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
280                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
281                 .setDataBroker(dataBroker)
282                 .setInstanceIdentifier(instanceIdentifier)
283                 .setRpcProviderRegistry(rpcProviderRegistry)
284                 .setNode(node)
285                 .setActorSystem(actorSystem)
286                 .setEventExecutor(eventExecutor)
287                 .setKeepaliveExecutor(keepaliveExecutor)
288                 .setProcessingExecutor(processingExecutor)
289                 .setTopologyId(topologyId)
290                 .setNetconfClientDispatcher(clientDispatcher)
291                 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
292                 .setIdleTimeout(writeTxIdleTimeout)
293                 .setPrivateKeyPath(privateKeyPath)
294                 .setPrivateKeyPassphrase(privateKeyPassphrase)
295                 .setEncryptionService(encryptionService);
296
297         return builder.build();
298     }
299 }