2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.time.Duration;
20 import java.util.Collection;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ScheduledExecutorService;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.RpcProviderService;
34 import org.opendaylight.mdsal.binding.api.WriteTransaction;
35 import org.opendaylight.mdsal.common.api.CommitInfo;
36 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
37 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
38 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
39 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
42 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
43 import org.opendaylight.netconf.client.NetconfClientDispatcher;
44 import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
45 import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
46 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseNetconfSchemas;
47 import org.opendaylight.netconf.sal.connect.util.NetconfTopologyRPCProvider;
48 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
49 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
50 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
52 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
53 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeTopologyService;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.concepts.Registration;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
71 public class NetconfTopologyManager
72 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
74 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
76 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
77 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
78 clusterRegistrations = new ConcurrentHashMap<>();
80 private final BaseNetconfSchemas baseSchemas;
81 private final DataBroker dataBroker;
82 private final DOMRpcProviderService rpcProviderRegistry;
83 private final DOMActionProviderService actionProviderRegistry;
84 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
85 private final ScheduledExecutorService keepaliveExecutor;
86 private final ListeningExecutorService processingExecutor;
87 private final ActorSystem actorSystem;
88 private final EventExecutor eventExecutor;
89 private final NetconfClientDispatcher clientDispatcher;
90 private final String topologyId;
91 private final Duration writeTxIdleTimeout;
92 private final DOMMountPointService mountPointService;
93 private final AAAEncryptionService encryptionService;
94 private final RpcProviderService rpcProviderService;
95 private final DeviceActionFactory deviceActionFactory;
96 private final SchemaResourceManager resourceManager;
98 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
99 private Registration rpcReg;
100 private String privateKeyPath;
101 private String privateKeyPassphrase;
103 public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
104 final DOMRpcProviderService rpcProviderRegistry,
105 final DOMActionProviderService actionProviderService,
106 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
107 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
108 final ActorSystemProvider actorSystemProvider,
109 final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
110 final String topologyId, final Config config,
111 final DOMMountPointService mountPointService,
112 final AAAEncryptionService encryptionService,
113 final RpcProviderService rpcProviderService,
114 final DeviceActionFactory deviceActionFactory,
115 final SchemaResourceManager resourceManager) {
116 this.baseSchemas = requireNonNull(baseSchemas);
117 this.dataBroker = requireNonNull(dataBroker);
118 this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
119 this.actionProviderRegistry = requireNonNull(actionProviderService);
120 this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
121 this.keepaliveExecutor = keepaliveExecutor.getExecutor();
122 this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
123 this.actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
124 this.eventExecutor = requireNonNull(eventExecutor);
125 this.clientDispatcher = requireNonNull(clientDispatcher);
126 this.topologyId = requireNonNull(topologyId);
127 this.writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
128 this.mountPointService = mountPointService;
129 this.encryptionService = requireNonNull(encryptionService);
130 this.rpcProviderService = requireNonNull(rpcProviderService);
131 this.deviceActionFactory = requireNonNull(deviceActionFactory);
132 this.resourceManager = requireNonNull(resourceManager);
135 // Blueprint init method
137 dataChangeListenerRegistration = registerDataTreeChangeListener();
138 rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
139 new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
143 public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
144 for (final DataTreeModification<Node> change : changes) {
145 final DataObjectModification<Node> rootNode = change.getRootNode();
146 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
147 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
148 switch (rootNode.getModificationType()) {
149 case SUBTREE_MODIFIED:
150 LOG.debug("Config for node {} updated", nodeId);
151 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
154 if (contexts.containsKey(dataModifIdent)) {
155 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
156 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
158 LOG.debug("Config for node {} created", nodeId);
159 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
163 LOG.debug("Config for node {} deleted", nodeId);
164 stopNetconfDeviceContext(dataModifIdent);
167 LOG.warn("Unknown operation for {}.", nodeId);
172 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
173 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
174 context.refresh(createSetup(instanceIdentifier, node));
177 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
178 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
179 // retry registration several times and log the error.
180 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
181 @SuppressWarnings("checkstyle:IllegalCatch")
182 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
183 final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
185 final Timeout actorResponseWaitTime = Timeout.create(
186 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
188 final ServiceGroupIdentifier serviceGroupIdent =
189 ServiceGroupIdentifier.create(instanceIdentifier.toString());
191 final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
192 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
197 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
198 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
199 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
200 contexts.put(instanceIdentifier, newNetconfTopologyContext);
202 } catch (final RuntimeException e) {
203 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
206 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
207 newNetconfTopologyContext, e);
208 close(newNetconfTopologyContext);
215 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
216 final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
217 if (netconfTopologyContext != null) {
218 close(clusterRegistrations.remove(instanceIdentifier));
219 close(netconfTopologyContext);
224 protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
225 final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
226 final DeviceActionFactory deviceActionFact) {
227 return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService,
232 public void close() {
233 if (rpcReg != null) {
237 if (dataChangeListenerRegistration != null) {
238 dataChangeListenerRegistration.close();
239 dataChangeListenerRegistration = null;
242 contexts.values().forEach(NetconfTopologyManager::close);
243 clusterRegistrations.values().forEach(NetconfTopologyManager::close);
246 clusterRegistrations.clear();
249 @SuppressWarnings("checkstyle:IllegalCatch")
250 private static void close(final AutoCloseable closeable) {
253 } catch (Exception e) {
254 LOG.warn("Error closing {}", closeable, e);
259 * Sets the private key path from location specified in configuration file using blueprint.
261 public void setPrivateKeyPath(final String privateKeyPath) {
262 this.privateKeyPath = privateKeyPath;
266 * Sets the private key passphrase from location specified in configuration file using blueprint.
268 public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
269 this.privateKeyPassphrase = privateKeyPassphrase;
272 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
273 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
274 initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
275 initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
276 wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
278 public void onSuccess(final CommitInfo result) {
279 LOG.debug("topology initialization successful");
283 public void onFailure(final Throwable throwable) {
284 LOG.error("Unable to initialize netconf-topology", throwable);
286 }, MoreExecutors.directExecutor());
288 LOG.debug("Registering datastore listener");
289 return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
290 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
293 private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
294 final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
295 final InstanceIdentifier<NetworkTopology> networkTopologyId =
296 InstanceIdentifier.builder(NetworkTopology.class).build();
297 wtx.merge(datastoreType, networkTopologyId, networkTopology);
298 final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
299 wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
300 new TopologyKey(new TopologyId(topologyId))), topology);
303 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
304 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
305 final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
307 return NetconfTopologySetupBuilder.create()
308 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
309 .setBaseSchemas(baseSchemas)
310 .setDataBroker(dataBroker)
311 .setInstanceIdentifier(instanceIdentifier)
312 .setRpcProviderRegistry(rpcProviderRegistry)
313 .setActionProviderRegistry(actionProviderRegistry)
315 .setActorSystem(actorSystem)
316 .setEventExecutor(eventExecutor)
317 .setKeepaliveExecutor(keepaliveExecutor)
318 .setProcessingExecutor(processingExecutor)
319 .setTopologyId(topologyId)
320 .setNetconfClientDispatcher(clientDispatcher)
321 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode, deviceId))
322 .setIdleTimeout(writeTxIdleTimeout)
323 .setPrivateKeyPath(privateKeyPath)
324 .setPrivateKeyPassphrase(privateKeyPassphrase)
325 .setEncryptionService(encryptionService)