2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.time.Duration;
20 import java.util.Collection;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.ScheduledExecutorService;
25 import javax.annotation.PreDestroy;
26 import javax.inject.Inject;
27 import javax.inject.Singleton;
28 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
29 import org.opendaylight.controller.cluster.ActorSystemProvider;
30 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
31 import org.opendaylight.controller.config.threadpool.ThreadPool;
32 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
33 import org.opendaylight.mdsal.binding.api.DataBroker;
34 import org.opendaylight.mdsal.binding.api.DataObjectModification;
35 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
36 import org.opendaylight.mdsal.binding.api.DataTreeModification;
37 import org.opendaylight.mdsal.binding.api.RpcProviderService;
38 import org.opendaylight.mdsal.binding.api.WriteTransaction;
39 import org.opendaylight.mdsal.common.api.CommitInfo;
40 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
41 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
42 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
43 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
44 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
45 import org.opendaylight.netconf.client.NetconfClientDispatcher;
46 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
47 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
48 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
49 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
50 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
52 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
53 import org.opendaylight.netconf.topology.spi.NetconfClientConfigurationBuilderFactory;
54 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
55 import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeTopologyService;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.concepts.Registration;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.opendaylight.yangtools.yang.common.Uint16;
69 import org.osgi.service.component.annotations.Activate;
70 import org.osgi.service.component.annotations.Component;
71 import org.osgi.service.component.annotations.Deactivate;
72 import org.osgi.service.component.annotations.Reference;
73 import org.osgi.service.metatype.annotations.AttributeDefinition;
74 import org.osgi.service.metatype.annotations.Designate;
75 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
80 @Component(service = { }, configurationPid = "org.opendaylight.netconf.topology.singleton")
81 @Designate(ocd = NetconfTopologyManager.Configuration.class)
82 // Non-final for testing
83 public class NetconfTopologyManager implements ClusteredDataTreeChangeListener<Node>, AutoCloseable {
84 @ObjectClassDefinition
85 public @interface Configuration {
86 @AttributeDefinition(min = "1", description = "Name of the Network Topology instance to manage")
87 String topology$_$id() default "topology-netconf";
89 @AttributeDefinition(min = "0", max = "65535",
90 description = "Idle time in seconds after which write transaction is cancelled automatically. If 0, "
91 + "automatic cancellation is turned off.")
92 int write$_$transaction$_$idle$_$timeout() default 0;
95 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
97 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
98 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
99 clusterRegistrations = new ConcurrentHashMap<>();
101 private final BaseNetconfSchemas baseSchemas;
102 private final DataBroker dataBroker;
103 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
104 private final ScheduledExecutorService keepaliveExecutor;
105 private final Executor processingExecutor;
106 private final ActorSystem actorSystem;
107 private final EventExecutor eventExecutor;
108 private final NetconfClientDispatcher clientDispatcher;
109 private final String topologyId;
110 private final Duration writeTxIdleTimeout;
111 private final DOMMountPointService mountPointService;
112 private final AAAEncryptionService encryptionService;
113 private final RpcProviderService rpcProviderService;
114 private final DeviceActionFactory deviceActionFactory;
115 private final NetconfClientConfigurationBuilderFactory builderFactory;
116 private final SchemaResourceManager resourceManager;
118 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
119 private Registration rpcReg;
122 public NetconfTopologyManager(@Reference final BaseNetconfSchemas baseSchemas,
123 @Reference final DataBroker dataBroker,
124 @Reference final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
125 @Reference(target = "(type=global-netconf-ssh-scheduled-executor)")
126 final ScheduledThreadPool keepaliveExecutor,
127 @Reference(target = "(type=global-netconf-processing-executor)") final ThreadPool processingExecutor,
128 @Reference final ActorSystemProvider actorSystemProvider,
129 @Reference(target = "(type=global-event-executor)") final EventExecutor eventExecutor,
130 @Reference(target = "(type=netconf-client-dispatcher)") final NetconfClientDispatcher clientDispatcher,
131 @Reference final DOMMountPointService mountPointService,
132 @Reference final AAAEncryptionService encryptionService,
133 @Reference final RpcProviderService rpcProviderService,
134 @Reference final DeviceActionFactory deviceActionFactory,
135 @Reference final SchemaResourceManager resourceManager,
136 @Reference final NetconfClientConfigurationBuilderFactory builderFactory,
137 final Configuration configuration) {
138 this(baseSchemas, dataBroker, clusterSingletonServiceProvider, keepaliveExecutor.getExecutor(),
139 processingExecutor.getExecutor(), actorSystemProvider.getActorSystem(), eventExecutor, clientDispatcher,
140 mountPointService, encryptionService, rpcProviderService, deviceActionFactory, resourceManager,
141 builderFactory, configuration.topology$_$id(),
142 Uint16.valueOf(configuration.write$_$transaction$_$idle$_$timeout()));
146 public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
147 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
148 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
149 final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
150 final NetconfClientDispatcher clientDispatcher, final DOMMountPointService mountPointService,
151 final AAAEncryptionService encryptionService, final RpcProviderService rpcProviderService,
152 final DeviceActionFactory deviceActionFactory, final SchemaResourceManager resourceManager,
153 final NetconfClientConfigurationBuilderFactory builderFactory) {
154 this(baseSchemas, dataBroker, clusterSingletonServiceProvider, keepaliveExecutor.getExecutor(),
155 processingExecutor.getExecutor(), actorSystemProvider.getActorSystem(), eventExecutor, clientDispatcher,
156 mountPointService, encryptionService, rpcProviderService, deviceActionFactory, resourceManager,
157 builderFactory, NetconfNodeUtils.DEFAULT_TOPOLOGY_NAME, Uint16.ZERO);
160 @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR",
161 justification = "Non-final for mocking, but we register for DTCL and that leaks 'this'")
162 public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
163 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
164 final ScheduledExecutorService keepaliveExecutor, final Executor processingExecutor,
165 final ActorSystem actorSystem, final EventExecutor eventExecutor,
166 final NetconfClientDispatcher clientDispatcher, final DOMMountPointService mountPointService,
167 final AAAEncryptionService encryptionService, final RpcProviderService rpcProviderService,
168 final DeviceActionFactory deviceActionFactory, final SchemaResourceManager resourceManager,
169 final NetconfClientConfigurationBuilderFactory builderFactory, final String topologyId,
170 final Uint16 writeTransactionIdleTimeout) {
171 this.baseSchemas = requireNonNull(baseSchemas);
172 this.dataBroker = requireNonNull(dataBroker);
173 this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
174 this.keepaliveExecutor = requireNonNull(keepaliveExecutor);
175 this.processingExecutor = requireNonNull(processingExecutor);
176 this.actorSystem = requireNonNull(actorSystem);
177 this.eventExecutor = requireNonNull(eventExecutor);
178 this.clientDispatcher = requireNonNull(clientDispatcher);
179 this.topologyId = requireNonNull(topologyId);
180 writeTxIdleTimeout = Duration.ofSeconds(writeTransactionIdleTimeout.toJava());
181 this.mountPointService = mountPointService;
182 this.encryptionService = requireNonNull(encryptionService);
183 this.rpcProviderService = requireNonNull(rpcProviderService);
184 this.deviceActionFactory = requireNonNull(deviceActionFactory);
185 this.resourceManager = requireNonNull(resourceManager);
186 this.builderFactory = requireNonNull(builderFactory);
188 dataChangeListenerRegistration = registerDataTreeChangeListener();
189 rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
190 new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
194 public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
195 for (final DataTreeModification<Node> change : changes) {
196 final DataObjectModification<Node> rootNode = change.getRootNode();
197 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
198 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
199 switch (rootNode.getModificationType()) {
200 case SUBTREE_MODIFIED:
201 LOG.debug("Config for node {} updated", nodeId);
202 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
205 if (contexts.containsKey(dataModifIdent)) {
206 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
207 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
209 LOG.debug("Config for node {} created", nodeId);
210 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
214 LOG.debug("Config for node {} deleted", nodeId);
215 stopNetconfDeviceContext(dataModifIdent);
218 LOG.warn("Unknown operation for {}.", nodeId);
223 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
224 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
225 context.refresh(createSetup(instanceIdentifier, node));
228 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
229 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
230 // retry registration several times and log the error.
231 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
232 @SuppressWarnings("checkstyle:IllegalCatch")
233 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
234 final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
236 final Timeout actorResponseWaitTime = Timeout.create(
237 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
239 final ServiceGroupIdentifier serviceGroupIdent =
240 ServiceGroupIdentifier.create(instanceIdentifier.toString());
242 final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
243 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
248 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
249 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
250 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
251 contexts.put(instanceIdentifier, newNetconfTopologyContext);
253 } catch (final RuntimeException e) {
254 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
257 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
258 newNetconfTopologyContext, e);
259 close(newNetconfTopologyContext);
266 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
267 final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
268 if (netconfTopologyContext != null) {
269 close(clusterRegistrations.remove(instanceIdentifier));
270 close(netconfTopologyContext);
275 protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
276 final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
277 final DeviceActionFactory deviceActionFact) {
278 return new NetconfTopologyContext(resourceManager, mountPointService, builderFactory, deviceActionFactory,
279 actorResponseWaitTime, serviceGroupIdent, setup);
285 public void close() {
286 if (rpcReg != null) {
290 if (dataChangeListenerRegistration != null) {
291 dataChangeListenerRegistration.close();
292 dataChangeListenerRegistration = null;
295 contexts.values().forEach(NetconfTopologyManager::close);
296 clusterRegistrations.values().forEach(NetconfTopologyManager::close);
299 clusterRegistrations.clear();
302 @SuppressWarnings("checkstyle:IllegalCatch")
303 private static void close(final AutoCloseable closeable) {
306 } catch (Exception e) {
307 LOG.warn("Error closing {}", closeable, e);
311 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
312 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
313 // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
314 // also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
315 // oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
316 wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
317 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
318 .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
319 wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
321 public void onSuccess(final CommitInfo result) {
322 LOG.debug("topology initialization successful");
326 public void onFailure(final Throwable throwable) {
327 LOG.error("Unable to initialize netconf-topology", throwable);
329 }, MoreExecutors.directExecutor());
331 LOG.debug("Registering datastore listener");
332 return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
333 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
336 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
337 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
338 final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
340 return NetconfTopologySetupBuilder.create()
341 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
342 .setBaseSchemas(baseSchemas)
343 .setDataBroker(dataBroker)
344 .setInstanceIdentifier(instanceIdentifier)
346 .setActorSystem(actorSystem)
347 .setEventExecutor(eventExecutor)
348 .setKeepaliveExecutor(keepaliveExecutor)
349 .setProcessingExecutor(processingExecutor)
350 .setTopologyId(topologyId)
351 .setNetconfClientDispatcher(clientDispatcher)
352 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
354 .setIdleTimeout(writeTxIdleTimeout)