2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.util.Collection;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.WriteTransaction;
34 import org.opendaylight.mdsal.common.api.CommitInfo;
35 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
36 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
37 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
38 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
39 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
41 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
42 import org.opendaylight.netconf.client.NetconfClientDispatcher;
43 import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
44 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
45 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
46 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
47 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
55 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
56 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
58 import org.opendaylight.yangtools.concepts.ListenerRegistration;
59 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62 import scala.concurrent.duration.Duration;
64 public class NetconfTopologyManager
65 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
67 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
69 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
70 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
71 clusterRegistrations = new ConcurrentHashMap<>();
73 private final DataBroker dataBroker;
74 private final DOMRpcProviderService rpcProviderRegistry;
75 private final DOMActionProviderService actionProviderRegistry;
76 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
77 private final ScheduledExecutorService keepaliveExecutor;
78 private final ListeningExecutorService processingExecutor;
79 private final ActorSystem actorSystem;
80 private final EventExecutor eventExecutor;
81 private final NetconfClientDispatcher clientDispatcher;
82 private final String topologyId;
83 private final Duration writeTxIdleTimeout;
84 private final DOMMountPointService mountPointService;
85 private final AAAEncryptionService encryptionService;
86 private final DeviceActionFactory deviceActionFactory;
87 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
88 private String privateKeyPath;
89 private String privateKeyPassphrase;
91 public NetconfTopologyManager(final DataBroker dataBroker, final DOMRpcProviderService rpcProviderRegistry,
92 final DOMActionProviderService actionProviderService,
93 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
94 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
95 final ActorSystemProvider actorSystemProvider,
96 final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
97 final String topologyId, final Config config,
98 final DOMMountPointService mountPointService,
99 final AAAEncryptionService encryptionService,
100 final DeviceActionFactory deviceActionFactory) {
102 this.dataBroker = requireNonNull(dataBroker);
103 this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
104 this.actionProviderRegistry = requireNonNull(actionProviderService);
105 this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
106 this.keepaliveExecutor = keepaliveExecutor.getExecutor();
107 this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
108 this.actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
109 this.eventExecutor = requireNonNull(eventExecutor);
110 this.clientDispatcher = requireNonNull(clientDispatcher);
111 this.topologyId = requireNonNull(topologyId);
112 this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout().toJava(), TimeUnit.SECONDS);
113 this.mountPointService = mountPointService;
114 this.encryptionService = requireNonNull(encryptionService);
115 this.deviceActionFactory = requireNonNull(deviceActionFactory);
119 // Blueprint init method
121 dataChangeListenerRegistration = registerDataTreeChangeListener();
125 public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
126 for (final DataTreeModification<Node> change : changes) {
127 final DataObjectModification<Node> rootNode = change.getRootNode();
128 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
129 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
130 switch (rootNode.getModificationType()) {
131 case SUBTREE_MODIFIED:
132 LOG.debug("Config for node {} updated", nodeId);
133 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
136 if (contexts.containsKey(dataModifIdent)) {
137 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
138 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
140 LOG.debug("Config for node {} created", nodeId);
141 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
145 LOG.debug("Config for node {} deleted", nodeId);
146 stopNetconfDeviceContext(dataModifIdent);
149 LOG.warn("Unknown operation for {}.", nodeId);
154 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
155 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
156 context.refresh(createSetup(instanceIdentifier, node));
159 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
160 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
161 // retry registration several times and log the error.
162 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
163 @SuppressWarnings("checkstyle:IllegalCatch")
164 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
165 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
166 requireNonNull(netconfNode);
167 requireNonNull(netconfNode.getHost());
168 requireNonNull(netconfNode.getHost().getIpAddress());
170 final Timeout actorResponseWaitTime = new Timeout(
171 Duration.create(netconfNode.getActorResponseWaitTime().toJava(), "seconds"));
173 final ServiceGroupIdentifier serviceGroupIdent =
174 ServiceGroupIdentifier.create(instanceIdentifier.toString());
176 final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
177 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
182 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
183 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
184 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
185 contexts.put(instanceIdentifier, newNetconfTopologyContext);
187 } catch (final RuntimeException e) {
188 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
191 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
192 newNetconfTopologyContext, e);
193 close(newNetconfTopologyContext);
200 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
201 final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
202 if (netconfTopologyContext != null) {
203 close(clusterRegistrations.remove(instanceIdentifier));
204 close(netconfTopologyContext);
209 protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
210 final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
211 final DeviceActionFactory deviceActionFact) {
212 return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService,
217 public void close() {
218 if (dataChangeListenerRegistration != null) {
219 dataChangeListenerRegistration.close();
220 dataChangeListenerRegistration = null;
223 contexts.values().forEach(NetconfTopologyManager::close);
224 clusterRegistrations.values().forEach(NetconfTopologyManager::close);
227 clusterRegistrations.clear();
230 @SuppressWarnings("checkstyle:IllegalCatch")
231 private static void close(final AutoCloseable closeable) {
234 } catch (Exception e) {
235 LOG.warn("Error closing {}", closeable, e);
240 * Sets the private key path from location specified in configuration file using blueprint.
242 public void setPrivateKeyPath(final String privateKeyPath) {
243 this.privateKeyPath = privateKeyPath;
247 * Sets the private key passphrase from location specified in configuration file using blueprint.
249 public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
250 this.privateKeyPassphrase = privateKeyPassphrase;
253 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
254 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
255 initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
256 initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
257 wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
259 public void onSuccess(final CommitInfo result) {
260 LOG.debug("topology initialization successful");
264 public void onFailure(final Throwable throwable) {
265 LOG.error("Unable to initialize netconf-topology", throwable);
267 }, MoreExecutors.directExecutor());
269 LOG.debug("Registering datastore listener");
270 return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
271 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
274 private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
275 final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
276 final InstanceIdentifier<NetworkTopology> networkTopologyId =
277 InstanceIdentifier.builder(NetworkTopology.class).build();
278 wtx.merge(datastoreType, networkTopologyId, networkTopology);
279 final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
280 wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
281 new TopologyKey(new TopologyId(topologyId))), topology);
284 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
285 final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
286 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
287 .setDataBroker(dataBroker)
288 .setInstanceIdentifier(instanceIdentifier)
289 .setRpcProviderRegistry(rpcProviderRegistry)
290 .setActionProviderRegistry(actionProviderRegistry)
292 .setActorSystem(actorSystem)
293 .setEventExecutor(eventExecutor)
294 .setKeepaliveExecutor(keepaliveExecutor)
295 .setProcessingExecutor(processingExecutor)
296 .setTopologyId(topologyId)
297 .setNetconfClientDispatcher(clientDispatcher)
298 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
299 .setIdleTimeout(writeTxIdleTimeout)
300 .setPrivateKeyPath(privateKeyPath)
301 .setPrivateKeyPassphrase(privateKeyPassphrase)
302 .setEncryptionService(encryptionService);
304 return builder.build();