2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import io.netty.util.concurrent.EventExecutor;
18 import java.time.Duration;
19 import java.util.Collection;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.Executor;
23 import java.util.concurrent.ScheduledExecutorService;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.RpcProviderService;
34 import org.opendaylight.mdsal.binding.api.WriteTransaction;
35 import org.opendaylight.mdsal.common.api.CommitInfo;
36 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
37 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
38 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
39 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
42 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
43 import org.opendaylight.netconf.client.NetconfClientDispatcher;
44 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
45 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
46 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
47 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
48 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
49 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
50 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
52 import org.opendaylight.netconf.topology.spi.NetconfClientConfigurationBuilderFactory;
53 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
54 import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeTopologyService;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.concepts.Registration;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
71 public class NetconfTopologyManager
72 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
74 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
76 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
77 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
78 clusterRegistrations = new ConcurrentHashMap<>();
80 private final BaseNetconfSchemas baseSchemas;
81 private final DataBroker dataBroker;
82 private final DOMRpcProviderService rpcProviderRegistry;
83 private final DOMActionProviderService actionProviderRegistry;
84 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
85 private final ScheduledExecutorService keepaliveExecutorService;
86 private final Executor processingExecutorService;
87 private final ActorSystem actorSystem;
88 private final EventExecutor eventExecutor;
89 private final NetconfClientDispatcher clientDispatcher;
90 private final String topologyId;
91 private final Duration writeTxIdleTimeout;
92 private final DOMMountPointService mountPointService;
93 private final AAAEncryptionService encryptionService;
94 private final RpcProviderService rpcProviderService;
95 private final DeviceActionFactory deviceActionFactory;
96 private final NetconfClientConfigurationBuilderFactory builderFactory;
97 private final SchemaResourceManager resourceManager;
99 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
100 private Registration rpcReg;
103 public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
104 final DOMRpcProviderService rpcProviderRegistry,
105 final DOMActionProviderService actionProviderService,
106 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
107 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
108 final ActorSystemProvider actorSystemProvider,
109 final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
110 final String topologyId, final Config config,
111 final DOMMountPointService mountPointService,
112 final AAAEncryptionService encryptionService,
113 final RpcProviderService rpcProviderService,
114 final DeviceActionFactory deviceActionFactory,
115 final SchemaResourceManager resourceManager,
116 final NetconfClientConfigurationBuilderFactory builderFactory) {
117 this.baseSchemas = requireNonNull(baseSchemas);
118 this.dataBroker = requireNonNull(dataBroker);
119 this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
120 actionProviderRegistry = requireNonNull(actionProviderService);
121 this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
122 keepaliveExecutorService = keepaliveExecutor.getExecutor();
123 processingExecutorService = processingExecutor.getExecutor();
124 actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
125 this.eventExecutor = requireNonNull(eventExecutor);
126 this.clientDispatcher = requireNonNull(clientDispatcher);
127 this.topologyId = requireNonNull(topologyId);
128 writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
129 this.mountPointService = mountPointService;
130 this.encryptionService = requireNonNull(encryptionService);
131 this.rpcProviderService = requireNonNull(rpcProviderService);
132 this.deviceActionFactory = requireNonNull(deviceActionFactory);
133 this.resourceManager = requireNonNull(resourceManager);
134 this.builderFactory = requireNonNull(builderFactory);
137 // Blueprint init method
139 dataChangeListenerRegistration = registerDataTreeChangeListener();
140 rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
141 new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
145 public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
146 for (final DataTreeModification<Node> change : changes) {
147 final DataObjectModification<Node> rootNode = change.getRootNode();
148 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
149 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
150 switch (rootNode.getModificationType()) {
151 case SUBTREE_MODIFIED:
152 LOG.debug("Config for node {} updated", nodeId);
153 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
156 if (contexts.containsKey(dataModifIdent)) {
157 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
158 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
160 LOG.debug("Config for node {} created", nodeId);
161 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
165 LOG.debug("Config for node {} deleted", nodeId);
166 stopNetconfDeviceContext(dataModifIdent);
169 LOG.warn("Unknown operation for {}.", nodeId);
174 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
175 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
176 context.refresh(createSetup(instanceIdentifier, node));
179 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
180 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
181 // retry registration several times and log the error.
182 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
183 @SuppressWarnings("checkstyle:IllegalCatch")
184 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
185 final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
187 final Timeout actorResponseWaitTime = Timeout.create(
188 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
190 final ServiceGroupIdentifier serviceGroupIdent =
191 ServiceGroupIdentifier.create(instanceIdentifier.toString());
193 final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
194 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
199 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
200 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
201 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
202 contexts.put(instanceIdentifier, newNetconfTopologyContext);
204 } catch (final RuntimeException e) {
205 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
208 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
209 newNetconfTopologyContext, e);
210 close(newNetconfTopologyContext);
217 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
218 final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
219 if (netconfTopologyContext != null) {
220 close(clusterRegistrations.remove(instanceIdentifier));
221 close(netconfTopologyContext);
226 protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
227 final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
228 final DeviceActionFactory deviceActionFact) {
229 return new NetconfTopologyContext(resourceManager, mountPointService, builderFactory, deviceActionFactory,
230 actorResponseWaitTime, serviceGroupIdent, setup);
234 public void close() {
235 if (rpcReg != null) {
239 if (dataChangeListenerRegistration != null) {
240 dataChangeListenerRegistration.close();
241 dataChangeListenerRegistration = null;
244 contexts.values().forEach(NetconfTopologyManager::close);
245 clusterRegistrations.values().forEach(NetconfTopologyManager::close);
248 clusterRegistrations.clear();
251 @SuppressWarnings("checkstyle:IllegalCatch")
252 private static void close(final AutoCloseable closeable) {
255 } catch (Exception e) {
256 LOG.warn("Error closing {}", closeable, e);
260 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
261 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
262 // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
263 // also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
264 // oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
265 wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
266 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
267 .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
268 wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
270 public void onSuccess(final CommitInfo result) {
271 LOG.debug("topology initialization successful");
275 public void onFailure(final Throwable throwable) {
276 LOG.error("Unable to initialize netconf-topology", throwable);
278 }, MoreExecutors.directExecutor());
280 LOG.debug("Registering datastore listener");
281 return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
282 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
285 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
286 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
287 final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
289 return NetconfTopologySetupBuilder.create()
290 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
291 .setBaseSchemas(baseSchemas)
292 .setDataBroker(dataBroker)
293 .setInstanceIdentifier(instanceIdentifier)
294 .setRpcProviderRegistry(rpcProviderRegistry)
295 .setActionProviderRegistry(actionProviderRegistry)
297 .setActorSystem(actorSystem)
298 .setEventExecutor(eventExecutor)
299 .setKeepaliveExecutor(keepaliveExecutorService)
300 .setProcessingExecutor(processingExecutorService)
301 .setTopologyId(topologyId)
302 .setNetconfClientDispatcher(clientDispatcher)
303 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
305 .setIdleTimeout(writeTxIdleTimeout)