2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.time.Duration;
20 import java.util.Collection;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.ScheduledExecutorService;
25 import javax.annotation.PreDestroy;
26 import javax.inject.Inject;
27 import javax.inject.Singleton;
28 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
29 import org.opendaylight.controller.cluster.ActorSystemProvider;
30 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
31 import org.opendaylight.controller.config.threadpool.ThreadPool;
32 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
33 import org.opendaylight.mdsal.binding.api.DataBroker;
34 import org.opendaylight.mdsal.binding.api.DataObjectModification;
35 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
36 import org.opendaylight.mdsal.binding.api.DataTreeModification;
37 import org.opendaylight.mdsal.binding.api.RpcProviderService;
38 import org.opendaylight.mdsal.binding.api.WriteTransaction;
39 import org.opendaylight.mdsal.common.api.CommitInfo;
40 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
41 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
42 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
43 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
44 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
45 import org.opendaylight.netconf.client.NetconfClientDispatcher;
46 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
47 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
48 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
49 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
50 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
52 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
53 import org.opendaylight.netconf.topology.spi.NetconfClientConfigurationBuilderFactory;
54 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
55 import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
64 import org.opendaylight.yangtools.concepts.ListenerRegistration;
65 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
66 import org.opendaylight.yangtools.yang.common.Uint16;
67 import org.osgi.service.component.annotations.Activate;
68 import org.osgi.service.component.annotations.Component;
69 import org.osgi.service.component.annotations.Deactivate;
70 import org.osgi.service.component.annotations.Reference;
71 import org.osgi.service.metatype.annotations.AttributeDefinition;
72 import org.osgi.service.metatype.annotations.Designate;
73 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
78 @Component(service = { }, configurationPid = "org.opendaylight.netconf.topology.singleton")
79 @Designate(ocd = NetconfTopologyManager.Configuration.class)
80 // Non-final for testing
81 public class NetconfTopologyManager implements ClusteredDataTreeChangeListener<Node>, AutoCloseable {
82 @ObjectClassDefinition
83 public @interface Configuration {
84 @AttributeDefinition(min = "1", description = "Name of the Network Topology instance to manage")
85 String topology$_$id() default "topology-netconf";
87 @AttributeDefinition(min = "0", max = "65535",
88 description = "Idle time in seconds after which write transaction is cancelled automatically. If 0, "
89 + "automatic cancellation is turned off.")
90 int write$_$transaction$_$idle$_$timeout() default 0;
93 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
95 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
96 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
97 clusterRegistrations = new ConcurrentHashMap<>();
99 private final BaseNetconfSchemas baseSchemas;
100 private final DataBroker dataBroker;
101 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
102 private final ScheduledExecutorService keepaliveExecutor;
103 private final Executor processingExecutor;
104 private final ActorSystem actorSystem;
105 private final EventExecutor eventExecutor;
106 private final NetconfClientDispatcher clientDispatcher;
107 private final String topologyId;
108 private final Duration writeTxIdleTimeout;
109 private final DOMMountPointService mountPointService;
110 private final DeviceActionFactory deviceActionFactory;
111 private final NetconfClientConfigurationBuilderFactory builderFactory;
112 private final SchemaResourceManager resourceManager;
114 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
115 private NetconfTopologyRPCProvider rpcProvider;
118 public NetconfTopologyManager(@Reference final BaseNetconfSchemas baseSchemas,
119 @Reference final DataBroker dataBroker,
120 @Reference final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
121 @Reference(target = "(type=global-netconf-ssh-scheduled-executor)")
122 final ScheduledThreadPool keepaliveExecutor,
123 @Reference(target = "(type=global-netconf-processing-executor)") final ThreadPool processingExecutor,
124 @Reference final ActorSystemProvider actorSystemProvider,
125 @Reference(target = "(type=global-event-executor)") final EventExecutor eventExecutor,
126 @Reference(target = "(type=netconf-client-dispatcher)") final NetconfClientDispatcher clientDispatcher,
127 @Reference final DOMMountPointService mountPointService,
128 @Reference final AAAEncryptionService encryptionService,
129 @Reference final RpcProviderService rpcProviderService,
130 @Reference final DeviceActionFactory deviceActionFactory,
131 @Reference final SchemaResourceManager resourceManager,
132 @Reference final NetconfClientConfigurationBuilderFactory builderFactory,
133 final Configuration configuration) {
134 this(baseSchemas, dataBroker, clusterSingletonServiceProvider, keepaliveExecutor.getExecutor(),
135 processingExecutor.getExecutor(), actorSystemProvider.getActorSystem(), eventExecutor, clientDispatcher,
136 mountPointService, encryptionService, rpcProviderService, deviceActionFactory, resourceManager,
137 builderFactory, configuration.topology$_$id(),
138 Uint16.valueOf(configuration.write$_$transaction$_$idle$_$timeout()));
142 public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
143 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
144 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
145 final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
146 final NetconfClientDispatcher clientDispatcher, final DOMMountPointService mountPointService,
147 final AAAEncryptionService encryptionService, final RpcProviderService rpcProviderService,
148 final DeviceActionFactory deviceActionFactory, final SchemaResourceManager resourceManager,
149 final NetconfClientConfigurationBuilderFactory builderFactory) {
150 this(baseSchemas, dataBroker, clusterSingletonServiceProvider, keepaliveExecutor.getExecutor(),
151 processingExecutor.getExecutor(), actorSystemProvider.getActorSystem(), eventExecutor, clientDispatcher,
152 mountPointService, encryptionService, rpcProviderService, deviceActionFactory, resourceManager,
153 builderFactory, NetconfNodeUtils.DEFAULT_TOPOLOGY_NAME, Uint16.ZERO);
156 @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR",
157 justification = "Non-final for mocking, but we register for DTCL and that leaks 'this'")
158 public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
159 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
160 final ScheduledExecutorService keepaliveExecutor, final Executor processingExecutor,
161 final ActorSystem actorSystem, final EventExecutor eventExecutor,
162 final NetconfClientDispatcher clientDispatcher, final DOMMountPointService mountPointService,
163 final AAAEncryptionService encryptionService, final RpcProviderService rpcProviderService,
164 final DeviceActionFactory deviceActionFactory, final SchemaResourceManager resourceManager,
165 final NetconfClientConfigurationBuilderFactory builderFactory, final String topologyId,
166 final Uint16 writeTransactionIdleTimeout) {
167 this.baseSchemas = requireNonNull(baseSchemas);
168 this.dataBroker = requireNonNull(dataBroker);
169 this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
170 this.keepaliveExecutor = requireNonNull(keepaliveExecutor);
171 this.processingExecutor = requireNonNull(processingExecutor);
172 this.actorSystem = requireNonNull(actorSystem);
173 this.eventExecutor = requireNonNull(eventExecutor);
174 this.clientDispatcher = requireNonNull(clientDispatcher);
175 this.topologyId = requireNonNull(topologyId);
176 writeTxIdleTimeout = Duration.ofSeconds(writeTransactionIdleTimeout.toJava());
177 this.mountPointService = mountPointService;
178 this.deviceActionFactory = requireNonNull(deviceActionFactory);
179 this.resourceManager = requireNonNull(resourceManager);
180 this.builderFactory = requireNonNull(builderFactory);
182 dataChangeListenerRegistration = registerDataTreeChangeListener();
183 rpcProvider = new NetconfTopologyRPCProvider(rpcProviderService, dataBroker, encryptionService, topologyId);
187 public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
188 for (final DataTreeModification<Node> change : changes) {
189 final DataObjectModification<Node> rootNode = change.getRootNode();
190 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
191 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
192 switch (rootNode.getModificationType()) {
193 case SUBTREE_MODIFIED:
194 LOG.debug("Config for node {} updated", nodeId);
195 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
198 if (contexts.containsKey(dataModifIdent)) {
199 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
200 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
202 LOG.debug("Config for node {} created", nodeId);
203 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
207 LOG.debug("Config for node {} deleted", nodeId);
208 stopNetconfDeviceContext(dataModifIdent);
211 LOG.warn("Unknown operation for {}.", nodeId);
216 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
217 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
218 context.refresh(createSetup(instanceIdentifier, node));
221 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
222 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
223 // retry registration several times and log the error.
224 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
225 @SuppressWarnings("checkstyle:IllegalCatch")
226 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
227 final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
229 final Timeout actorResponseWaitTime = Timeout.create(
230 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
232 final ServiceGroupIdentifier serviceGroupIdent =
233 ServiceGroupIdentifier.create(instanceIdentifier.toString());
235 final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
236 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
241 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
242 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
243 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
244 contexts.put(instanceIdentifier, newNetconfTopologyContext);
246 } catch (final RuntimeException e) {
247 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
250 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
251 newNetconfTopologyContext, e);
252 close(newNetconfTopologyContext);
259 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
260 final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
261 if (netconfTopologyContext != null) {
262 close(clusterRegistrations.remove(instanceIdentifier));
263 close(netconfTopologyContext);
268 protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
269 final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
270 final DeviceActionFactory deviceActionFact) {
271 return new NetconfTopologyContext(resourceManager, mountPointService, builderFactory, deviceActionFactory,
272 actorResponseWaitTime, serviceGroupIdent, setup);
278 public void close() {
279 if (rpcProvider != null) {
283 if (dataChangeListenerRegistration != null) {
284 dataChangeListenerRegistration.close();
285 dataChangeListenerRegistration = null;
288 contexts.values().forEach(NetconfTopologyManager::close);
289 clusterRegistrations.values().forEach(NetconfTopologyManager::close);
292 clusterRegistrations.clear();
295 @SuppressWarnings("checkstyle:IllegalCatch")
296 private static void close(final AutoCloseable closeable) {
299 } catch (Exception e) {
300 LOG.warn("Error closing {}", closeable, e);
304 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
305 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
306 // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
307 // also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
308 // oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
309 wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
310 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
311 .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
312 wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
314 public void onSuccess(final CommitInfo result) {
315 LOG.debug("topology initialization successful");
319 public void onFailure(final Throwable throwable) {
320 LOG.error("Unable to initialize netconf-topology", throwable);
322 }, MoreExecutors.directExecutor());
324 LOG.debug("Registering datastore listener");
325 return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
326 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
329 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
330 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
331 final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
333 return NetconfTopologySetupBuilder.create()
334 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
335 .setBaseSchemas(baseSchemas)
336 .setDataBroker(dataBroker)
337 .setInstanceIdentifier(instanceIdentifier)
339 .setActorSystem(actorSystem)
340 .setEventExecutor(eventExecutor)
341 .setKeepaliveExecutor(keepaliveExecutor)
342 .setProcessingExecutor(processingExecutor)
343 .setTopologyId(topologyId)
344 .setNetconfClientDispatcher(clientDispatcher)
345 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
347 .setIdleTimeout(writeTxIdleTimeout)