Merge changes Id268d747,Id095a4ff
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl;
10
11 import akka.actor.ActorSystem;
12 import akka.util.Timeout;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.util.Collection;
20 import java.util.Map;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.TimeUnit;
23 import javax.annotation.Nonnull;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
30 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
31 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
36 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
37 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
38 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
39 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
40 import org.opendaylight.netconf.client.NetconfClientDispatcher;
41 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
42 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
43 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
44 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
55 import org.opendaylight.yangtools.concepts.ListenerRegistration;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59 import scala.concurrent.duration.Duration;
60
61 public class NetconfTopologyManager
62         implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
63
64     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
65
66     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
67     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
68             clusterRegistrations = new ConcurrentHashMap<>();
69
70     private final DataBroker dataBroker;
71     private final RpcProviderRegistry rpcProviderRegistry;
72     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
73     private final ScheduledThreadPool keepaliveExecutor;
74     private final ThreadPool processingExecutor;
75     private final ActorSystem actorSystem;
76     private final EventExecutor eventExecutor;
77     private final NetconfClientDispatcher clientDispatcher;
78     private final String topologyId;
79     private final Duration writeTxIdleTimeout;
80     private final DOMMountPointService mountPointService;
81     private final AAAEncryptionService encryptionService;
82     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
83     private String privateKeyPath;
84     private String privateKeyPassphrase;
85
86     public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
87                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
88                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
89                                   final ActorSystemProvider actorSystemProvider,
90                                   final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
91                                   final String topologyId, final Config config,
92                                   final DOMMountPointService mountPointService,
93                                   final AAAEncryptionService encryptionService) {
94
95         this.dataBroker = Preconditions.checkNotNull(dataBroker);
96         this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
97         this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
98         this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
99         this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
100         this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
101         this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
102         this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
103         this.topologyId = Preconditions.checkNotNull(topologyId);
104         this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
105         this.mountPointService = mountPointService;
106         this.encryptionService = Preconditions.checkNotNull(encryptionService);
107
108     }
109
110     // Blueprint init method
111     public void init() {
112         dataChangeListenerRegistration = registerDataTreeChangeListener();
113     }
114
115     @Override
116     public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
117         for (final DataTreeModification<Node> change : changes) {
118             final DataObjectModification<Node> rootNode = change.getRootNode();
119             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
120             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
121             switch (rootNode.getModificationType()) {
122                 case SUBTREE_MODIFIED:
123                     LOG.debug("Config for node {} updated", nodeId);
124                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
125                     break;
126                 case WRITE:
127                     if (contexts.containsKey(dataModifIdent)) {
128                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
129                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
130                     } else {
131                         LOG.debug("Config for node {} created", nodeId);
132                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
133                     }
134                     break;
135                 case DELETE:
136                     LOG.debug("Config for node {} deleted", nodeId);
137                     stopNetconfDeviceContext(dataModifIdent);
138                     break;
139                 default:
140                     LOG.warn("Unknown operation for {}.", nodeId);
141             }
142         }
143     }
144
145     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
146         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
147         context.refresh(createSetup(instanceIdentifier, node));
148     }
149
150     // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
151     // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
152     // retry registration several times and log the error.
153     // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
154     @SuppressWarnings("checkstyle:IllegalCatch")
155     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
156         final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
157         Preconditions.checkNotNull(netconfNode);
158         Preconditions.checkNotNull(netconfNode.getHost());
159         Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
160
161         final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
162                 "seconds"));
163
164         final ServiceGroupIdentifier serviceGroupIdent =
165                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
166
167         final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
168                 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime);
169
170         int tries = 3;
171         while (true) {
172             try {
173                 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
174                         clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
175                 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
176                 contexts.put(instanceIdentifier, newNetconfTopologyContext);
177                 break;
178             } catch (final RuntimeException e) {
179                 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
180
181                 if (--tries <= 0) {
182                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
183                             newNetconfTopologyContext, e);
184                     close(newNetconfTopologyContext);
185                     break;
186                 }
187             }
188         }
189     }
190
191     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
192         final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
193         if (netconfTopologyContext != null) {
194             close(clusterRegistrations.remove(instanceIdentifier));
195             close(netconfTopologyContext);
196         }
197     }
198
199     @VisibleForTesting
200     protected NetconfTopologyContext newNetconfTopologyContext(NetconfTopologySetup setup,
201             ServiceGroupIdentifier serviceGroupIdent, Timeout actorResponseWaitTime) {
202         return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService);
203     }
204
205     @Override
206     public void close() {
207         if (dataChangeListenerRegistration != null) {
208             dataChangeListenerRegistration.close();
209             dataChangeListenerRegistration = null;
210         }
211
212         contexts.values().forEach(netconfTopologyContext -> close(netconfTopologyContext));
213
214         clusterRegistrations.values().forEach(
215             clusterSingletonServiceRegistration -> close(clusterSingletonServiceRegistration));
216
217         contexts.clear();
218         clusterRegistrations.clear();
219     }
220
221     @SuppressWarnings("checkstyle:IllegalCatch")
222     private void close(AutoCloseable closeable) {
223         try {
224             closeable.close();
225         } catch (Exception e) {
226             LOG.warn("Error closing {}", closeable, e);
227         }
228     }
229
230     /**
231      * Sets the private key path from location specified in configuration file using blueprint.
232      */
233     public void setPrivateKeyPath(String privateKeyPath) {
234         this.privateKeyPath = privateKeyPath;
235     }
236
237     /**
238      * Sets the private key passphrase from location specified in configuration file using blueprint.
239      */
240     public void setPrivateKeyPassphrase(String privateKeyPassphrase) {
241         this.privateKeyPassphrase = privateKeyPassphrase;
242     }
243
244     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
245         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
246         initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
247         initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
248         Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
249             @Override
250             public void onSuccess(final Void result) {
251                 LOG.debug("topology initialization successful");
252             }
253
254             @Override
255             public void onFailure(@Nonnull final Throwable throwable) {
256                 LOG.error("Unable to initialize netconf-topology, {}", throwable);
257             }
258         }, MoreExecutors.directExecutor());
259
260         LOG.debug("Registering datastore listener");
261         return dataBroker.registerDataTreeChangeListener(
262                 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
263                         NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
264     }
265
266     private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
267         final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
268         final InstanceIdentifier<NetworkTopology> networkTopologyId =
269                 InstanceIdentifier.builder(NetworkTopology.class).build();
270         wtx.merge(datastoreType, networkTopologyId, networkTopology);
271         final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
272         wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
273                 new TopologyKey(new TopologyId(topologyId))), topology);
274     }
275
276     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
277         final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
278                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
279                 .setDataBroker(dataBroker)
280                 .setInstanceIdentifier(instanceIdentifier)
281                 .setRpcProviderRegistry(rpcProviderRegistry)
282                 .setNode(node)
283                 .setActorSystem(actorSystem)
284                 .setEventExecutor(eventExecutor)
285                 .setKeepaliveExecutor(keepaliveExecutor)
286                 .setProcessingExecutor(processingExecutor)
287                 .setTopologyId(topologyId)
288                 .setNetconfClientDispatcher(clientDispatcher)
289                 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
290                 .setIdleTimeout(writeTxIdleTimeout)
291                 .setPrivateKeyPath(privateKeyPath)
292                 .setPrivateKeyPassphrase(privateKeyPassphrase)
293                 .setEncryptionService(encryptionService);
294
295         return builder.build();
296     }
297 }