2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl;
11 import akka.actor.ActorSystem;
12 import akka.util.Timeout;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import io.netty.util.concurrent.EventExecutor;
18 import java.util.Collection;
19 import java.util.HashMap;
21 import java.util.concurrent.TimeUnit;
22 import javax.annotation.Nonnull;
23 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
24 import org.opendaylight.controller.cluster.ActorSystemProvider;
25 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
26 import org.opendaylight.controller.config.threadpool.ThreadPool;
27 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
30 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
31 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
35 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
37 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
38 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
39 import org.opendaylight.netconf.client.NetconfClientDispatcher;
40 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
41 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
42 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
43 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
54 import org.opendaylight.yangtools.concepts.ListenerRegistration;
55 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import scala.concurrent.duration.Duration;
60 public class NetconfTopologyManager
61 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
63 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
65 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
66 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
67 clusterRegistrations = new HashMap<>();
69 private final DataBroker dataBroker;
70 private final RpcProviderRegistry rpcProviderRegistry;
71 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
72 private final ScheduledThreadPool keepaliveExecutor;
73 private final ThreadPool processingExecutor;
74 private final ActorSystem actorSystem;
75 private final EventExecutor eventExecutor;
76 private final NetconfClientDispatcher clientDispatcher;
77 private final String topologyId;
78 private final Duration writeTxIdleTimeout;
79 private final DOMMountPointService mountPointService;
80 private final AAAEncryptionService encryptionService;
81 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
82 private String privateKeyPath;
83 private String privateKeyPassphrase;
85 public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
86 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
87 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
88 final ActorSystemProvider actorSystemProvider,
89 final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
90 final String topologyId, final Config config,
91 final DOMMountPointService mountPointService,
92 final AAAEncryptionService encryptionService) {
94 this.dataBroker = Preconditions.checkNotNull(dataBroker);
95 this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
96 this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
97 this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
98 this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
99 this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
100 this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
101 this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
102 this.topologyId = Preconditions.checkNotNull(topologyId);
103 this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
104 this.mountPointService = mountPointService;
105 this.encryptionService = Preconditions.checkNotNull(encryptionService);
109 // Blueprint init method
111 dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId);
115 public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
116 for (final DataTreeModification<Node> change : changes) {
117 final DataObjectModification<Node> rootNode = change.getRootNode();
118 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
119 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
120 switch (rootNode.getModificationType()) {
121 case SUBTREE_MODIFIED:
122 LOG.debug("Config for node {} updated", nodeId);
123 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
126 if (contexts.containsKey(dataModifIdent)) {
127 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
128 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
130 LOG.debug("Config for node {} created", nodeId);
131 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
135 LOG.debug("Config for node {} deleted", nodeId);
136 stopNetconfDeviceContext(dataModifIdent);
139 LOG.warn("Unknown operation for {}.", nodeId);
144 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
145 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
146 context.refresh(createSetup(instanceIdentifier, node));
149 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
150 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
151 // retry registration several times and log the error.
152 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
153 @SuppressWarnings("checkstyle:IllegalCatch")
154 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
155 final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
156 Preconditions.checkNotNull(netconfNode);
157 Preconditions.checkNotNull(netconfNode.getHost());
158 Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
160 final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
163 final ServiceGroupIdentifier serviceGroupIdent =
164 ServiceGroupIdentifier.create(instanceIdentifier.toString());
166 final NetconfTopologyContext newNetconfTopologyContext =
167 new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
168 actorResponseWaitTime, mountPointService);
173 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
174 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
175 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
176 contexts.put(instanceIdentifier, newNetconfTopologyContext);
178 } catch (final RuntimeException e) {
179 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
182 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
183 newNetconfTopologyContext, e);
192 @SuppressWarnings("checkstyle:IllegalCatch")
193 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
194 if (contexts.containsKey(instanceIdentifier)) {
196 clusterRegistrations.get(instanceIdentifier).close();
197 contexts.get(instanceIdentifier).closeFinal();
198 } catch (final Exception e) {
199 LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
201 contexts.remove(instanceIdentifier);
202 clusterRegistrations.remove(instanceIdentifier);
206 @SuppressWarnings("checkstyle:IllegalCatch")
208 public void close() {
209 if (dataChangeListenerRegistration != null) {
210 dataChangeListenerRegistration.close();
211 dataChangeListenerRegistration = null;
213 contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
215 netconfTopologyContext.closeFinal();
216 } catch (final Exception e) {
217 LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
220 clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
222 clusterSingletonServiceRegistration.close();
223 } catch (final Exception e) {
224 LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
228 clusterRegistrations.clear();
232 * Sets the private key path from location specified in configuration file using blueprint.
234 public void setPrivateKeyPath(String privateKeyPath) {
235 this.privateKeyPath = privateKeyPath;
239 * Sets the private key passphrase from location specified in configuration file using blueprint.
241 public void setPrivateKeyPassphrase(String privateKeyPassphrase) {
242 this.privateKeyPassphrase = privateKeyPassphrase;
245 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
246 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
247 initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
248 initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
249 Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
251 public void onSuccess(final Void result) {
252 LOG.debug("topology initialization successful");
256 public void onFailure(@Nonnull final Throwable throwable) {
257 LOG.error("Unable to initialize netconf-topology, {}", throwable);
259 }, MoreExecutors.directExecutor());
261 LOG.debug("Registering datastore listener");
262 return dataBroker.registerDataTreeChangeListener(
263 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
264 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
267 private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType,
268 final String topologyId) {
269 final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
270 final InstanceIdentifier<NetworkTopology> networkTopologyId =
271 InstanceIdentifier.builder(NetworkTopology.class).build();
272 wtx.merge(datastoreType, networkTopologyId, networkTopology);
273 final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
274 wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
275 new TopologyKey(new TopologyId(topologyId))), topology);
278 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
279 final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
280 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
281 .setDataBroker(dataBroker)
282 .setInstanceIdentifier(instanceIdentifier)
283 .setRpcProviderRegistry(rpcProviderRegistry)
285 .setActorSystem(actorSystem)
286 .setEventExecutor(eventExecutor)
287 .setKeepaliveExecutor(keepaliveExecutor)
288 .setProcessingExecutor(processingExecutor)
289 .setTopologyId(topologyId)
290 .setNetconfClientDispatcher(clientDispatcher)
291 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
292 .setIdleTimeout(writeTxIdleTimeout)
293 .setPrivateKeyPath(privateKeyPath)
294 .setPrivateKeyPassphrase(privateKeyPassphrase)
295 .setEncryptionService(encryptionService);
297 return builder.build();