2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.time.Duration;
20 import java.util.Collection;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ScheduledExecutorService;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.RpcProviderService;
34 import org.opendaylight.mdsal.binding.api.WriteTransaction;
35 import org.opendaylight.mdsal.common.api.CommitInfo;
36 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
37 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
38 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
39 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
42 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
43 import org.opendaylight.netconf.client.NetconfClientDispatcher;
44 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
45 import org.opendaylight.netconf.client.mdsal.api.CredentialProvider;
46 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
47 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
48 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
49 import org.opendaylight.netconf.client.mdsal.api.SslHandlerFactoryProvider;
50 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
52 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
53 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
54 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
55 import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeTopologyService;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
65 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
66 import org.opendaylight.yangtools.concepts.ListenerRegistration;
67 import org.opendaylight.yangtools.concepts.Registration;
68 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
72 public class NetconfTopologyManager
73 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
75 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
77 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
78 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
79 clusterRegistrations = new ConcurrentHashMap<>();
81 private final BaseNetconfSchemas baseSchemas;
82 private final DataBroker dataBroker;
83 private final DOMRpcProviderService rpcProviderRegistry;
84 private final DOMActionProviderService actionProviderRegistry;
85 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
86 private final ScheduledThreadPool keepaliveExecutor;
87 private final ScheduledExecutorService keepaliveExecutorService;
88 private final ThreadPool processingExecutor;
89 private final ListeningExecutorService processingExecutorService;
90 private final ActorSystem actorSystem;
91 private final EventExecutor eventExecutor;
92 private final NetconfClientDispatcher clientDispatcher;
93 private final String topologyId;
94 private final Duration writeTxIdleTimeout;
95 private final DOMMountPointService mountPointService;
96 private final AAAEncryptionService encryptionService;
97 private final RpcProviderService rpcProviderService;
98 private final DeviceActionFactory deviceActionFactory;
99 private final CredentialProvider credentialProvider;
100 private final SslHandlerFactoryProvider sslHandlerFactoryProvider;
101 private final SchemaResourceManager resourceManager;
103 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
104 private Registration rpcReg;
106 public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
107 final DOMRpcProviderService rpcProviderRegistry,
108 final DOMActionProviderService actionProviderService,
109 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
110 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
111 final ActorSystemProvider actorSystemProvider,
112 final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
113 final String topologyId, final Config config,
114 final DOMMountPointService mountPointService,
115 final AAAEncryptionService encryptionService,
116 final RpcProviderService rpcProviderService,
117 final DeviceActionFactory deviceActionFactory,
118 final SchemaResourceManager resourceManager,
119 final CredentialProvider credentialProvider,
120 final SslHandlerFactoryProvider sslHandlerFactoryProvider) {
121 this.baseSchemas = requireNonNull(baseSchemas);
122 this.dataBroker = requireNonNull(dataBroker);
123 this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
124 actionProviderRegistry = requireNonNull(actionProviderService);
125 this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
126 this.keepaliveExecutor = keepaliveExecutor;
127 this.keepaliveExecutorService = keepaliveExecutor.getExecutor();
128 this.processingExecutor = processingExecutor;
129 this.processingExecutorService = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
130 actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
131 this.eventExecutor = requireNonNull(eventExecutor);
132 this.clientDispatcher = requireNonNull(clientDispatcher);
133 this.topologyId = requireNonNull(topologyId);
134 writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
135 this.mountPointService = mountPointService;
136 this.encryptionService = requireNonNull(encryptionService);
137 this.rpcProviderService = requireNonNull(rpcProviderService);
138 this.deviceActionFactory = requireNonNull(deviceActionFactory);
139 this.resourceManager = requireNonNull(resourceManager);
140 this.credentialProvider = requireNonNull(credentialProvider);
141 this.sslHandlerFactoryProvider = requireNonNull(sslHandlerFactoryProvider);
144 // Blueprint init method
146 dataChangeListenerRegistration = registerDataTreeChangeListener();
147 rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
148 new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
152 public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
153 for (final DataTreeModification<Node> change : changes) {
154 final DataObjectModification<Node> rootNode = change.getRootNode();
155 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
156 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
157 switch (rootNode.getModificationType()) {
158 case SUBTREE_MODIFIED:
159 LOG.debug("Config for node {} updated", nodeId);
160 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
163 if (contexts.containsKey(dataModifIdent)) {
164 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
165 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
167 LOG.debug("Config for node {} created", nodeId);
168 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
172 LOG.debug("Config for node {} deleted", nodeId);
173 stopNetconfDeviceContext(dataModifIdent);
176 LOG.warn("Unknown operation for {}.", nodeId);
181 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
182 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
183 context.refresh(createSetup(instanceIdentifier, node));
186 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
187 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
188 // retry registration several times and log the error.
189 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
190 @SuppressWarnings("checkstyle:IllegalCatch")
191 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
192 final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
194 final Timeout actorResponseWaitTime = Timeout.create(
195 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
197 final ServiceGroupIdentifier serviceGroupIdent =
198 ServiceGroupIdentifier.create(instanceIdentifier.toString());
200 final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
201 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
206 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
207 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
208 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
209 contexts.put(instanceIdentifier, newNetconfTopologyContext);
211 } catch (final RuntimeException e) {
212 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
215 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
216 newNetconfTopologyContext, e);
217 close(newNetconfTopologyContext);
224 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
225 final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
226 if (netconfTopologyContext != null) {
227 close(clusterRegistrations.remove(instanceIdentifier));
228 close(netconfTopologyContext);
233 protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
234 final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
235 final DeviceActionFactory deviceActionFact) {
236 return new NetconfTopologyContext(setup.getTopologyId(), setup.getNetconfClientDispatcher(),
237 setup.getEventExecutor(), keepaliveExecutor,
238 processingExecutor, resourceManager,
239 dataBroker, mountPointService,
240 encryptionService, deviceActionFactory,
241 baseSchemas, actorResponseWaitTime,
242 serviceGroupIdent, setup, credentialProvider, sslHandlerFactoryProvider);
246 public void close() {
247 if (rpcReg != null) {
251 if (dataChangeListenerRegistration != null) {
252 dataChangeListenerRegistration.close();
253 dataChangeListenerRegistration = null;
256 contexts.values().forEach(NetconfTopologyManager::close);
257 clusterRegistrations.values().forEach(NetconfTopologyManager::close);
260 clusterRegistrations.clear();
263 @SuppressWarnings("checkstyle:IllegalCatch")
264 private static void close(final AutoCloseable closeable) {
267 } catch (Exception e) {
268 LOG.warn("Error closing {}", closeable, e);
272 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
273 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
274 // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
275 // also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
276 // oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
277 wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
278 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
279 .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
280 wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
282 public void onSuccess(final CommitInfo result) {
283 LOG.debug("topology initialization successful");
287 public void onFailure(final Throwable throwable) {
288 LOG.error("Unable to initialize netconf-topology", throwable);
290 }, MoreExecutors.directExecutor());
292 LOG.debug("Registering datastore listener");
293 return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
294 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
297 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
298 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
299 final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
301 return NetconfTopologySetupBuilder.create()
302 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
303 .setBaseSchemas(baseSchemas)
304 .setDataBroker(dataBroker)
305 .setInstanceIdentifier(instanceIdentifier)
306 .setRpcProviderRegistry(rpcProviderRegistry)
307 .setActionProviderRegistry(actionProviderRegistry)
309 .setActorSystem(actorSystem)
310 .setEventExecutor(eventExecutor)
311 .setKeepaliveExecutor(keepaliveExecutorService)
312 .setProcessingExecutor(processingExecutorService)
313 .setTopologyId(topologyId)
314 .setNetconfClientDispatcher(clientDispatcher)
315 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
317 .setIdleTimeout(writeTxIdleTimeout)
318 .setEncryptionService(encryptionService)
319 .setCredentialProvider(credentialProvider)
320 .setSslHandlerFactoryProvider(sslHandlerFactoryProvider)