2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import io.netty.util.Timer;
19 import java.time.Duration;
20 import java.util.Collection;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.Executor;
24 import javax.annotation.PreDestroy;
25 import javax.inject.Inject;
26 import javax.inject.Singleton;
27 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
28 import org.opendaylight.controller.cluster.ActorSystemProvider;
29 import org.opendaylight.controller.config.threadpool.ThreadPool;
30 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
31 import org.opendaylight.mdsal.binding.api.DataBroker;
32 import org.opendaylight.mdsal.binding.api.DataObjectModification;
33 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
34 import org.opendaylight.mdsal.binding.api.DataTreeModification;
35 import org.opendaylight.mdsal.binding.api.RpcProviderService;
36 import org.opendaylight.mdsal.binding.api.WriteTransaction;
37 import org.opendaylight.mdsal.common.api.CommitInfo;
38 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
39 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
42 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
43 import org.opendaylight.netconf.client.NetconfClientFactory;
44 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
45 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
46 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
47 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
48 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
49 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
50 import org.opendaylight.netconf.topology.spi.NetconfClientConfigurationBuilderFactory;
51 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
52 import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
55 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
56 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
61 import org.opendaylight.yangtools.concepts.ListenerRegistration;
62 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
63 import org.opendaylight.yangtools.yang.common.Uint16;
64 import org.osgi.service.component.annotations.Activate;
65 import org.osgi.service.component.annotations.Component;
66 import org.osgi.service.component.annotations.Deactivate;
67 import org.osgi.service.component.annotations.Reference;
68 import org.osgi.service.metatype.annotations.AttributeDefinition;
69 import org.osgi.service.metatype.annotations.Designate;
70 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
75 @Component(service = { }, configurationPid = "org.opendaylight.netconf.topology.singleton")
76 @Designate(ocd = NetconfTopologyManager.Configuration.class)
77 // Non-final for testing
78 public class NetconfTopologyManager implements ClusteredDataTreeChangeListener<Node>, AutoCloseable {
79 @ObjectClassDefinition
80 public @interface Configuration {
81 @AttributeDefinition(min = "1", description = "Name of the Network Topology instance to manage")
82 String topology$_$id() default "topology-netconf";
84 @AttributeDefinition(min = "0", max = "65535",
85 description = "Idle time in seconds after which write transaction is cancelled automatically. If 0, "
86 + "automatic cancellation is turned off.")
87 int write$_$transaction$_$idle$_$timeout() default 0;
90 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
92 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
93 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
94 clusterRegistrations = new ConcurrentHashMap<>();
96 private final BaseNetconfSchemas baseSchemas;
97 private final DataBroker dataBroker;
98 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
99 private final Timer timer;
100 private final Executor processingExecutor;
101 private final ActorSystem actorSystem;
102 private final NetconfClientFactory clientFactory;
103 private final String topologyId;
104 private final Duration writeTxIdleTimeout;
105 private final DOMMountPointService mountPointService;
106 private final DeviceActionFactory deviceActionFactory;
107 private final NetconfClientConfigurationBuilderFactory builderFactory;
108 private final SchemaResourceManager resourceManager;
110 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
111 private NetconfTopologyRPCProvider rpcProvider;
114 public NetconfTopologyManager(@Reference final BaseNetconfSchemas baseSchemas,
115 @Reference final DataBroker dataBroker,
116 @Reference final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
117 @Reference(target = "(type=global-timer)") final Timer timer,
118 @Reference(target = "(type=global-netconf-processing-executor)") final ThreadPool processingExecutor,
119 @Reference final ActorSystemProvider actorSystemProvider,
120 @Reference(target = "(type=netconf-client-factory)") final NetconfClientFactory clientFactory,
121 @Reference final DOMMountPointService mountPointService,
122 @Reference final AAAEncryptionService encryptionService,
123 @Reference final RpcProviderService rpcProviderService,
124 @Reference final DeviceActionFactory deviceActionFactory,
125 @Reference final SchemaResourceManager resourceManager,
126 @Reference final NetconfClientConfigurationBuilderFactory builderFactory,
127 final Configuration configuration) {
128 this(baseSchemas, dataBroker, clusterSingletonServiceProvider, timer, processingExecutor.getExecutor(),
129 actorSystemProvider.getActorSystem(), clientFactory, mountPointService, encryptionService,
130 rpcProviderService, deviceActionFactory, resourceManager, builderFactory, configuration.topology$_$id(),
131 Uint16.valueOf(configuration.write$_$transaction$_$idle$_$timeout()));
135 public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
136 final ClusterSingletonServiceProvider clusterSingletonServiceProvider, final Timer timer,
137 final ThreadPool processingExecutor, final ActorSystemProvider actorSystemProvider,
138 final NetconfClientFactory clientFactory, final DOMMountPointService mountPointService,
139 final AAAEncryptionService encryptionService, final RpcProviderService rpcProviderService,
140 final DeviceActionFactory deviceActionFactory, final SchemaResourceManager resourceManager,
141 final NetconfClientConfigurationBuilderFactory builderFactory) {
142 this(baseSchemas, dataBroker, clusterSingletonServiceProvider, timer, processingExecutor.getExecutor(),
143 actorSystemProvider.getActorSystem(), clientFactory, mountPointService, encryptionService,
144 rpcProviderService, deviceActionFactory, resourceManager, builderFactory,
145 NetconfNodeUtils.DEFAULT_TOPOLOGY_NAME, Uint16.ZERO);
148 @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR",
149 justification = "Non-final for mocking, but we register for DTCL and that leaks 'this'")
150 public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
151 final ClusterSingletonServiceProvider clusterSingletonServiceProvider, final Timer timer,
152 final Executor processingExecutor, final ActorSystem actorSystem, final NetconfClientFactory clientFactory,
153 final DOMMountPointService mountPointService, final AAAEncryptionService encryptionService,
154 final RpcProviderService rpcProviderService, final DeviceActionFactory deviceActionFactory,
155 final SchemaResourceManager resourceManager, final NetconfClientConfigurationBuilderFactory builderFactory,
156 final String topologyId, final Uint16 writeTransactionIdleTimeout) {
157 this.baseSchemas = requireNonNull(baseSchemas);
158 this.dataBroker = requireNonNull(dataBroker);
159 this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
160 this.timer = requireNonNull(timer);
161 this.processingExecutor = requireNonNull(processingExecutor);
162 this.actorSystem = requireNonNull(actorSystem);
163 this.clientFactory = requireNonNull(clientFactory);
164 this.topologyId = requireNonNull(topologyId);
165 writeTxIdleTimeout = Duration.ofSeconds(writeTransactionIdleTimeout.toJava());
166 this.mountPointService = mountPointService;
167 this.deviceActionFactory = requireNonNull(deviceActionFactory);
168 this.resourceManager = requireNonNull(resourceManager);
169 this.builderFactory = requireNonNull(builderFactory);
171 dataChangeListenerRegistration = registerDataTreeChangeListener();
172 rpcProvider = new NetconfTopologyRPCProvider(rpcProviderService, dataBroker, encryptionService, topologyId);
176 public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
177 for (final DataTreeModification<Node> change : changes) {
178 final DataObjectModification<Node> rootNode = change.getRootNode();
179 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
180 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
181 switch (rootNode.getModificationType()) {
182 case SUBTREE_MODIFIED:
183 LOG.debug("Config for node {} updated", nodeId);
184 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
187 if (contexts.containsKey(dataModifIdent)) {
188 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
189 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
191 LOG.debug("Config for node {} created", nodeId);
192 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
196 LOG.debug("Config for node {} deleted", nodeId);
197 stopNetconfDeviceContext(dataModifIdent);
200 LOG.warn("Unknown operation for {}.", nodeId);
205 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
206 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
207 context.refresh(createSetup(instanceIdentifier, node));
210 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
211 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
212 // retry registration several times and log the error.
213 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
214 @SuppressWarnings("checkstyle:IllegalCatch")
215 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
216 final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
218 final Timeout actorResponseWaitTime = Timeout.create(
219 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
221 final ServiceGroupIdentifier serviceGroupIdent =
222 ServiceGroupIdentifier.create(instanceIdentifier.toString());
224 final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
225 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
230 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
231 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
232 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
233 contexts.put(instanceIdentifier, newNetconfTopologyContext);
235 } catch (final RuntimeException e) {
236 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
239 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
240 newNetconfTopologyContext, e);
241 close(newNetconfTopologyContext);
248 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
249 final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
250 if (netconfTopologyContext != null) {
251 close(clusterRegistrations.remove(instanceIdentifier));
252 close(netconfTopologyContext);
257 protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
258 final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
259 final DeviceActionFactory deviceActionFact) {
260 return new NetconfTopologyContext(resourceManager, mountPointService, builderFactory, deviceActionFactory,
261 actorResponseWaitTime, serviceGroupIdent, setup);
267 public void close() {
268 if (rpcProvider != null) {
272 if (dataChangeListenerRegistration != null) {
273 dataChangeListenerRegistration.close();
274 dataChangeListenerRegistration = null;
277 contexts.values().forEach(NetconfTopologyManager::close);
278 clusterRegistrations.values().forEach(NetconfTopologyManager::close);
281 clusterRegistrations.clear();
284 @SuppressWarnings("checkstyle:IllegalCatch")
285 private static void close(final AutoCloseable closeable) {
288 } catch (Exception e) {
289 LOG.warn("Error closing {}", closeable, e);
293 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
294 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
295 // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
296 // also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
297 // oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
298 wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
299 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
300 .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
301 wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
303 public void onSuccess(final CommitInfo result) {
304 LOG.debug("topology initialization successful");
308 public void onFailure(final Throwable throwable) {
309 LOG.error("Unable to initialize netconf-topology", throwable);
311 }, MoreExecutors.directExecutor());
313 LOG.debug("Registering datastore listener");
314 return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
315 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
318 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
319 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
320 final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
322 return NetconfTopologySetup.builder()
323 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
324 .setBaseSchemas(baseSchemas)
325 .setDataBroker(dataBroker)
326 .setInstanceIdentifier(instanceIdentifier)
328 .setActorSystem(actorSystem)
330 .setProcessingExecutor(processingExecutor)
331 .setTopologyId(topologyId)
332 .setNetconfClientFactory(clientFactory)
333 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(), deviceId))
334 .setIdleTimeout(writeTxIdleTimeout)