2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.time.Duration;
20 import java.util.Collection;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ScheduledExecutorService;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.RpcProviderService;
34 import org.opendaylight.mdsal.binding.api.WriteTransaction;
35 import org.opendaylight.mdsal.common.api.CommitInfo;
36 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
37 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
38 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
39 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
42 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
43 import org.opendaylight.netconf.client.NetconfClientDispatcher;
44 import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
45 import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
46 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseNetconfSchemas;
47 import org.opendaylight.netconf.sal.connect.util.NetconfTopologyRPCProvider;
48 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
49 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
50 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
52 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeTopologyService;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
56 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
64 import org.opendaylight.yangtools.concepts.ListenerRegistration;
65 import org.opendaylight.yangtools.concepts.Registration;
66 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
70 public class NetconfTopologyManager
71 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
73 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
75 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
76 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
77 clusterRegistrations = new ConcurrentHashMap<>();
79 private final BaseNetconfSchemas baseSchemas;
80 private final DataBroker dataBroker;
81 private final DOMRpcProviderService rpcProviderRegistry;
82 private final DOMActionProviderService actionProviderRegistry;
83 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
84 private final ScheduledExecutorService keepaliveExecutor;
85 private final ListeningExecutorService processingExecutor;
86 private final ActorSystem actorSystem;
87 private final EventExecutor eventExecutor;
88 private final NetconfClientDispatcher clientDispatcher;
89 private final String topologyId;
90 private final Duration writeTxIdleTimeout;
91 private final DOMMountPointService mountPointService;
92 private final AAAEncryptionService encryptionService;
93 private final RpcProviderService rpcProviderService;
94 private final DeviceActionFactory deviceActionFactory;
95 private final SchemaResourceManager resourceManager;
97 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
98 private Registration rpcReg;
99 private String privateKeyPath;
100 private String privateKeyPassphrase;
102 public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
103 final DOMRpcProviderService rpcProviderRegistry,
104 final DOMActionProviderService actionProviderService,
105 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
106 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
107 final ActorSystemProvider actorSystemProvider,
108 final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
109 final String topologyId, final Config config,
110 final DOMMountPointService mountPointService,
111 final AAAEncryptionService encryptionService,
112 final RpcProviderService rpcProviderService,
113 final DeviceActionFactory deviceActionFactory,
114 final SchemaResourceManager resourceManager) {
115 this.baseSchemas = requireNonNull(baseSchemas);
116 this.dataBroker = requireNonNull(dataBroker);
117 this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
118 this.actionProviderRegistry = requireNonNull(actionProviderService);
119 this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
120 this.keepaliveExecutor = keepaliveExecutor.getExecutor();
121 this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
122 this.actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
123 this.eventExecutor = requireNonNull(eventExecutor);
124 this.clientDispatcher = requireNonNull(clientDispatcher);
125 this.topologyId = requireNonNull(topologyId);
126 this.writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
127 this.mountPointService = mountPointService;
128 this.encryptionService = requireNonNull(encryptionService);
129 this.rpcProviderService = requireNonNull(rpcProviderService);
130 this.deviceActionFactory = requireNonNull(deviceActionFactory);
131 this.resourceManager = requireNonNull(resourceManager);
134 // Blueprint init method
136 dataChangeListenerRegistration = registerDataTreeChangeListener();
137 rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
138 new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
142 public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
143 for (final DataTreeModification<Node> change : changes) {
144 final DataObjectModification<Node> rootNode = change.getRootNode();
145 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
146 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
147 switch (rootNode.getModificationType()) {
148 case SUBTREE_MODIFIED:
149 LOG.debug("Config for node {} updated", nodeId);
150 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
153 if (contexts.containsKey(dataModifIdent)) {
154 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
155 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
157 LOG.debug("Config for node {} created", nodeId);
158 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
162 LOG.debug("Config for node {} deleted", nodeId);
163 stopNetconfDeviceContext(dataModifIdent);
166 LOG.warn("Unknown operation for {}.", nodeId);
171 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
172 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
173 context.refresh(createSetup(instanceIdentifier, node));
176 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
177 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
178 // retry registration several times and log the error.
179 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
180 @SuppressWarnings("checkstyle:IllegalCatch")
181 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
182 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
183 requireNonNull(netconfNode);
184 requireNonNull(netconfNode.getHost());
185 requireNonNull(netconfNode.getHost().getIpAddress());
187 final Timeout actorResponseWaitTime = Timeout.create(
188 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
190 final ServiceGroupIdentifier serviceGroupIdent =
191 ServiceGroupIdentifier.create(instanceIdentifier.toString());
193 final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
194 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
199 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
200 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
201 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
202 contexts.put(instanceIdentifier, newNetconfTopologyContext);
204 } catch (final RuntimeException e) {
205 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
208 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
209 newNetconfTopologyContext, e);
210 close(newNetconfTopologyContext);
217 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
218 final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
219 if (netconfTopologyContext != null) {
220 close(clusterRegistrations.remove(instanceIdentifier));
221 close(netconfTopologyContext);
226 protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
227 final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
228 final DeviceActionFactory deviceActionFact) {
229 return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService,
234 public void close() {
235 if (rpcReg != null) {
239 if (dataChangeListenerRegistration != null) {
240 dataChangeListenerRegistration.close();
241 dataChangeListenerRegistration = null;
244 contexts.values().forEach(NetconfTopologyManager::close);
245 clusterRegistrations.values().forEach(NetconfTopologyManager::close);
248 clusterRegistrations.clear();
251 @SuppressWarnings("checkstyle:IllegalCatch")
252 private static void close(final AutoCloseable closeable) {
255 } catch (Exception e) {
256 LOG.warn("Error closing {}", closeable, e);
261 * Sets the private key path from location specified in configuration file using blueprint.
263 public void setPrivateKeyPath(final String privateKeyPath) {
264 this.privateKeyPath = privateKeyPath;
268 * Sets the private key passphrase from location specified in configuration file using blueprint.
270 public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
271 this.privateKeyPassphrase = privateKeyPassphrase;
274 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
275 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
276 initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
277 initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
278 wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
280 public void onSuccess(final CommitInfo result) {
281 LOG.debug("topology initialization successful");
285 public void onFailure(final Throwable throwable) {
286 LOG.error("Unable to initialize netconf-topology", throwable);
288 }, MoreExecutors.directExecutor());
290 LOG.debug("Registering datastore listener");
291 return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
292 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
295 private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
296 final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
297 final InstanceIdentifier<NetworkTopology> networkTopologyId =
298 InstanceIdentifier.builder(NetworkTopology.class).build();
299 wtx.merge(datastoreType, networkTopologyId, networkTopology);
300 final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
301 wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
302 new TopologyKey(new TopologyId(topologyId))), topology);
305 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
306 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
307 final RemoteDeviceId deviceId = NetconfTopologyUtils.createRemoteDeviceId(node.getNodeId(), netconfNode);
309 final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
310 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
311 .setBaseSchemas(baseSchemas)
312 .setDataBroker(dataBroker)
313 .setInstanceIdentifier(instanceIdentifier)
314 .setRpcProviderRegistry(rpcProviderRegistry)
315 .setActionProviderRegistry(actionProviderRegistry)
317 .setActorSystem(actorSystem)
318 .setEventExecutor(eventExecutor)
319 .setKeepaliveExecutor(keepaliveExecutor)
320 .setProcessingExecutor(processingExecutor)
321 .setTopologyId(topologyId)
322 .setNetconfClientDispatcher(clientDispatcher)
323 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode, deviceId))
324 .setIdleTimeout(writeTxIdleTimeout)
325 .setPrivateKeyPath(privateKeyPath)
326 .setPrivateKeyPassphrase(privateKeyPassphrase)
327 .setEncryptionService(encryptionService);
329 return builder.build();