2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.time.Duration;
20 import java.util.Collection;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ScheduledExecutorService;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.RpcProviderService;
34 import org.opendaylight.mdsal.binding.api.WriteTransaction;
35 import org.opendaylight.mdsal.common.api.CommitInfo;
36 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
37 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
38 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
39 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
42 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
43 import org.opendaylight.netconf.client.NetconfClientDispatcher;
44 import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
45 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceId;
46 import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
47 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.BaseNetconfSchemas;
48 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
49 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
50 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
52 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
53 import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeTopologyService;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
64 import org.opendaylight.yangtools.concepts.ListenerRegistration;
65 import org.opendaylight.yangtools.concepts.Registration;
66 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
70 public class NetconfTopologyManager
71 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
73 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
75 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
76 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
77 clusterRegistrations = new ConcurrentHashMap<>();
79 private final BaseNetconfSchemas baseSchemas;
80 private final DataBroker dataBroker;
81 private final DOMRpcProviderService rpcProviderRegistry;
82 private final DOMActionProviderService actionProviderRegistry;
83 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
84 private final ScheduledExecutorService keepaliveExecutor;
85 private final ListeningExecutorService processingExecutor;
86 private final ActorSystem actorSystem;
87 private final EventExecutor eventExecutor;
88 private final NetconfClientDispatcher clientDispatcher;
89 private final String topologyId;
90 private final Duration writeTxIdleTimeout;
91 private final DOMMountPointService mountPointService;
92 private final AAAEncryptionService encryptionService;
93 private final RpcProviderService rpcProviderService;
94 private final DeviceActionFactory deviceActionFactory;
95 private final SchemaResourceManager resourceManager;
97 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
98 private Registration rpcReg;
99 private String privateKeyPath;
100 private String privateKeyPassphrase;
102 public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
103 final DOMRpcProviderService rpcProviderRegistry,
104 final DOMActionProviderService actionProviderService,
105 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
106 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
107 final ActorSystemProvider actorSystemProvider,
108 final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
109 final String topologyId, final Config config,
110 final DOMMountPointService mountPointService,
111 final AAAEncryptionService encryptionService,
112 final RpcProviderService rpcProviderService,
113 final DeviceActionFactory deviceActionFactory,
114 final SchemaResourceManager resourceManager) {
115 this.baseSchemas = requireNonNull(baseSchemas);
116 this.dataBroker = requireNonNull(dataBroker);
117 this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
118 actionProviderRegistry = requireNonNull(actionProviderService);
119 this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
120 this.keepaliveExecutor = keepaliveExecutor.getExecutor();
121 this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
122 actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
123 this.eventExecutor = requireNonNull(eventExecutor);
124 this.clientDispatcher = requireNonNull(clientDispatcher);
125 this.topologyId = requireNonNull(topologyId);
126 writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
127 this.mountPointService = mountPointService;
128 this.encryptionService = requireNonNull(encryptionService);
129 this.rpcProviderService = requireNonNull(rpcProviderService);
130 this.deviceActionFactory = requireNonNull(deviceActionFactory);
131 this.resourceManager = requireNonNull(resourceManager);
134 // Blueprint init method
136 dataChangeListenerRegistration = registerDataTreeChangeListener();
137 rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
138 new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
142 public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
143 for (final DataTreeModification<Node> change : changes) {
144 final DataObjectModification<Node> rootNode = change.getRootNode();
145 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
146 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
147 switch (rootNode.getModificationType()) {
148 case SUBTREE_MODIFIED:
149 LOG.debug("Config for node {} updated", nodeId);
150 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
153 if (contexts.containsKey(dataModifIdent)) {
154 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
155 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
157 LOG.debug("Config for node {} created", nodeId);
158 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
162 LOG.debug("Config for node {} deleted", nodeId);
163 stopNetconfDeviceContext(dataModifIdent);
166 LOG.warn("Unknown operation for {}.", nodeId);
171 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
172 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
173 context.refresh(createSetup(instanceIdentifier, node));
176 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
177 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
178 // retry registration several times and log the error.
179 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
180 @SuppressWarnings("checkstyle:IllegalCatch")
181 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
182 final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
184 final Timeout actorResponseWaitTime = Timeout.create(
185 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
187 final ServiceGroupIdentifier serviceGroupIdent =
188 ServiceGroupIdentifier.create(instanceIdentifier.toString());
190 final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
191 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
196 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
197 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
198 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
199 contexts.put(instanceIdentifier, newNetconfTopologyContext);
201 } catch (final RuntimeException e) {
202 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
205 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
206 newNetconfTopologyContext, e);
207 close(newNetconfTopologyContext);
214 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
215 final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
216 if (netconfTopologyContext != null) {
217 close(clusterRegistrations.remove(instanceIdentifier));
218 close(netconfTopologyContext);
223 protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
224 final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
225 final DeviceActionFactory deviceActionFact) {
226 return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService,
231 public void close() {
232 if (rpcReg != null) {
236 if (dataChangeListenerRegistration != null) {
237 dataChangeListenerRegistration.close();
238 dataChangeListenerRegistration = null;
241 contexts.values().forEach(NetconfTopologyManager::close);
242 clusterRegistrations.values().forEach(NetconfTopologyManager::close);
245 clusterRegistrations.clear();
248 @SuppressWarnings("checkstyle:IllegalCatch")
249 private static void close(final AutoCloseable closeable) {
252 } catch (Exception e) {
253 LOG.warn("Error closing {}", closeable, e);
258 * Sets the private key path from location specified in configuration file using blueprint.
260 public void setPrivateKeyPath(final String privateKeyPath) {
261 this.privateKeyPath = privateKeyPath;
265 * Sets the private key passphrase from location specified in configuration file using blueprint.
267 public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
268 this.privateKeyPassphrase = privateKeyPassphrase;
271 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
272 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
273 // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
274 // also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
275 // oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
276 wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
277 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
278 .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
279 wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
281 public void onSuccess(final CommitInfo result) {
282 LOG.debug("topology initialization successful");
286 public void onFailure(final Throwable throwable) {
287 LOG.error("Unable to initialize netconf-topology", throwable);
289 }, MoreExecutors.directExecutor());
291 LOG.debug("Registering datastore listener");
292 return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
293 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
296 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
297 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
298 final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
300 return NetconfTopologySetupBuilder.create()
301 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
302 .setBaseSchemas(baseSchemas)
303 .setDataBroker(dataBroker)
304 .setInstanceIdentifier(instanceIdentifier)
305 .setRpcProviderRegistry(rpcProviderRegistry)
306 .setActionProviderRegistry(actionProviderRegistry)
308 .setActorSystem(actorSystem)
309 .setEventExecutor(eventExecutor)
310 .setKeepaliveExecutor(keepaliveExecutor)
311 .setProcessingExecutor(processingExecutor)
312 .setTopologyId(topologyId)
313 .setNetconfClientDispatcher(clientDispatcher)
314 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
316 .setIdleTimeout(writeTxIdleTimeout)
317 .setPrivateKeyPath(privateKeyPath)
318 .setPrivateKeyPassphrase(privateKeyPassphrase)
319 .setEncryptionService(encryptionService)