f63d4114b69d6426afbb02a229b4931b6c0bb0ad
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import akka.actor.ActorSystem;
11 import akka.util.Timeout;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import io.netty.util.concurrent.EventExecutor;
18 import java.util.Collection;
19 import java.util.Map;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import javax.annotation.Nonnull;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.WriteTransaction;
34 import org.opendaylight.mdsal.common.api.CommitInfo;
35 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
36 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
37 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
38 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
39 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
40 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
41 import org.opendaylight.netconf.client.NetconfClientDispatcher;
42 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
43 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
44 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
45 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
55 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
56 import org.opendaylight.yangtools.concepts.ListenerRegistration;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60 import scala.concurrent.duration.Duration;
61
62 public class NetconfTopologyManager
63         implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
64
65     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
66
67     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
68     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
69             clusterRegistrations = new ConcurrentHashMap<>();
70
71     private final DataBroker dataBroker;
72     private final DOMRpcProviderService rpcProviderRegistry;
73     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
74     private final ScheduledExecutorService keepaliveExecutor;
75     private final ListeningExecutorService processingExecutor;
76     private final ActorSystem actorSystem;
77     private final EventExecutor eventExecutor;
78     private final NetconfClientDispatcher clientDispatcher;
79     private final String topologyId;
80     private final Duration writeTxIdleTimeout;
81     private final DOMMountPointService mountPointService;
82     private final AAAEncryptionService encryptionService;
83     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
84     private String privateKeyPath;
85     private String privateKeyPassphrase;
86
87     public NetconfTopologyManager(final DataBroker dataBroker, final DOMRpcProviderService rpcProviderRegistry,
88                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
89                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
90                                   final ActorSystemProvider actorSystemProvider,
91                                   final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
92                                   final String topologyId, final Config config,
93                                   final DOMMountPointService mountPointService,
94                                   final AAAEncryptionService encryptionService) {
95
96         this.dataBroker = Preconditions.checkNotNull(dataBroker);
97         this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
98         this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
99         this.keepaliveExecutor = keepaliveExecutor.getExecutor();
100         this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
101         this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
102         this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
103         this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
104         this.topologyId = Preconditions.checkNotNull(topologyId);
105         this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
106         this.mountPointService = mountPointService;
107         this.encryptionService = Preconditions.checkNotNull(encryptionService);
108
109     }
110
111     // Blueprint init method
112     public void init() {
113         dataChangeListenerRegistration = registerDataTreeChangeListener();
114     }
115
116     @Override
117     public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
118         for (final DataTreeModification<Node> change : changes) {
119             final DataObjectModification<Node> rootNode = change.getRootNode();
120             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
121             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
122             switch (rootNode.getModificationType()) {
123                 case SUBTREE_MODIFIED:
124                     LOG.debug("Config for node {} updated", nodeId);
125                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
126                     break;
127                 case WRITE:
128                     if (contexts.containsKey(dataModifIdent)) {
129                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
130                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
131                     } else {
132                         LOG.debug("Config for node {} created", nodeId);
133                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
134                     }
135                     break;
136                 case DELETE:
137                     LOG.debug("Config for node {} deleted", nodeId);
138                     stopNetconfDeviceContext(dataModifIdent);
139                     break;
140                 default:
141                     LOG.warn("Unknown operation for {}.", nodeId);
142             }
143         }
144     }
145
146     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
147         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
148         context.refresh(createSetup(instanceIdentifier, node));
149     }
150
151     // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
152     // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
153     // retry registration several times and log the error.
154     // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
155     @SuppressWarnings("checkstyle:IllegalCatch")
156     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
157         final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
158         Preconditions.checkNotNull(netconfNode);
159         Preconditions.checkNotNull(netconfNode.getHost());
160         Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
161
162         final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
163                 "seconds"));
164
165         final ServiceGroupIdentifier serviceGroupIdent =
166                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
167
168         final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
169                 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime);
170
171         int tries = 3;
172         while (true) {
173             try {
174                 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
175                         clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
176                 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
177                 contexts.put(instanceIdentifier, newNetconfTopologyContext);
178                 break;
179             } catch (final RuntimeException e) {
180                 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
181
182                 if (--tries <= 0) {
183                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
184                             newNetconfTopologyContext, e);
185                     close(newNetconfTopologyContext);
186                     break;
187                 }
188             }
189         }
190     }
191
192     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
193         final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
194         if (netconfTopologyContext != null) {
195             close(clusterRegistrations.remove(instanceIdentifier));
196             close(netconfTopologyContext);
197         }
198     }
199
200     @VisibleForTesting
201     protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
202             final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime) {
203         return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService);
204     }
205
206     @Override
207     public void close() {
208         if (dataChangeListenerRegistration != null) {
209             dataChangeListenerRegistration.close();
210             dataChangeListenerRegistration = null;
211         }
212
213         contexts.values().forEach(NetconfTopologyManager::close);
214         clusterRegistrations.values().forEach(NetconfTopologyManager::close);
215
216         contexts.clear();
217         clusterRegistrations.clear();
218     }
219
220     @SuppressWarnings("checkstyle:IllegalCatch")
221     private static void close(final AutoCloseable closeable) {
222         try {
223             closeable.close();
224         } catch (Exception e) {
225             LOG.warn("Error closing {}", closeable, e);
226         }
227     }
228
229     /**
230      * Sets the private key path from location specified in configuration file using blueprint.
231      */
232     public void setPrivateKeyPath(final String privateKeyPath) {
233         this.privateKeyPath = privateKeyPath;
234     }
235
236     /**
237      * Sets the private key passphrase from location specified in configuration file using blueprint.
238      */
239     public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
240         this.privateKeyPassphrase = privateKeyPassphrase;
241     }
242
243     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
244         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
245         initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
246         initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
247         wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
248             @Override
249             public void onSuccess(final CommitInfo result) {
250                 LOG.debug("topology initialization successful");
251             }
252
253             @Override
254             public void onFailure(@Nonnull final Throwable throwable) {
255                 LOG.error("Unable to initialize netconf-topology, {}", throwable);
256             }
257         }, MoreExecutors.directExecutor());
258
259         LOG.debug("Registering datastore listener");
260         return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
261             NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
262     }
263
264     private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
265         final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
266         final InstanceIdentifier<NetworkTopology> networkTopologyId =
267                 InstanceIdentifier.builder(NetworkTopology.class).build();
268         wtx.merge(datastoreType, networkTopologyId, networkTopology);
269         final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
270         wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
271                 new TopologyKey(new TopologyId(topologyId))), topology);
272     }
273
274     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
275         final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
276                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
277                 .setDataBroker(dataBroker)
278                 .setInstanceIdentifier(instanceIdentifier)
279                 .setRpcProviderRegistry(rpcProviderRegistry)
280                 .setNode(node)
281                 .setActorSystem(actorSystem)
282                 .setEventExecutor(eventExecutor)
283                 .setKeepaliveExecutor(keepaliveExecutor)
284                 .setProcessingExecutor(processingExecutor)
285                 .setTopologyId(topologyId)
286                 .setNetconfClientDispatcher(clientDispatcher)
287                 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
288                 .setIdleTimeout(writeTxIdleTimeout)
289                 .setPrivateKeyPath(privateKeyPath)
290                 .setPrivateKeyPassphrase(privateKeyPassphrase)
291                 .setEncryptionService(encryptionService);
292
293         return builder.build();
294     }
295 }