2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.time.Duration;
20 import java.util.Collection;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ScheduledExecutorService;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.RpcProviderService;
34 import org.opendaylight.mdsal.binding.api.WriteTransaction;
35 import org.opendaylight.mdsal.common.api.CommitInfo;
36 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
37 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
38 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
39 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
42 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
43 import org.opendaylight.netconf.client.NetconfClientDispatcher;
44 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
45 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
46 import org.opendaylight.netconf.client.mdsal.api.NetconfKeystoreAdapter;
47 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
48 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
49 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
50 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
52 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
53 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
54 import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeTopologyService;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.concepts.Registration;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
71 public class NetconfTopologyManager
72 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
74 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
76 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
77 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
78 clusterRegistrations = new ConcurrentHashMap<>();
80 private final BaseNetconfSchemas baseSchemas;
81 private final DataBroker dataBroker;
82 private final DOMRpcProviderService rpcProviderRegistry;
83 private final DOMActionProviderService actionProviderRegistry;
84 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
85 private final ScheduledExecutorService keepaliveExecutor;
86 private final ListeningExecutorService processingExecutor;
87 private final ActorSystem actorSystem;
88 private final EventExecutor eventExecutor;
89 private final NetconfClientDispatcher clientDispatcher;
90 private final String topologyId;
91 private final Duration writeTxIdleTimeout;
92 private final DOMMountPointService mountPointService;
93 private final AAAEncryptionService encryptionService;
94 private final RpcProviderService rpcProviderService;
95 private final DeviceActionFactory deviceActionFactory;
96 private final NetconfKeystoreAdapter keystoreAdapter;
97 private final SchemaResourceManager resourceManager;
99 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
100 private Registration rpcReg;
102 public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
103 final DOMRpcProviderService rpcProviderRegistry,
104 final DOMActionProviderService actionProviderService,
105 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
106 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
107 final ActorSystemProvider actorSystemProvider,
108 final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
109 final String topologyId, final Config config,
110 final DOMMountPointService mountPointService,
111 final AAAEncryptionService encryptionService,
112 final RpcProviderService rpcProviderService,
113 final DeviceActionFactory deviceActionFactory,
114 final SchemaResourceManager resourceManager,
115 final NetconfKeystoreAdapter keystoreAdapter) {
116 this.baseSchemas = requireNonNull(baseSchemas);
117 this.dataBroker = requireNonNull(dataBroker);
118 this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
119 actionProviderRegistry = requireNonNull(actionProviderService);
120 this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
121 this.keepaliveExecutor = keepaliveExecutor.getExecutor();
122 this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
123 actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
124 this.eventExecutor = requireNonNull(eventExecutor);
125 this.clientDispatcher = requireNonNull(clientDispatcher);
126 this.topologyId = requireNonNull(topologyId);
127 writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
128 this.mountPointService = mountPointService;
129 this.encryptionService = requireNonNull(encryptionService);
130 this.rpcProviderService = requireNonNull(rpcProviderService);
131 this.deviceActionFactory = requireNonNull(deviceActionFactory);
132 this.resourceManager = requireNonNull(resourceManager);
133 this.keystoreAdapter = requireNonNull(keystoreAdapter);
136 // Blueprint init method
138 dataChangeListenerRegistration = registerDataTreeChangeListener();
139 rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
140 new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
144 public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
145 for (final DataTreeModification<Node> change : changes) {
146 final DataObjectModification<Node> rootNode = change.getRootNode();
147 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
148 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
149 switch (rootNode.getModificationType()) {
150 case SUBTREE_MODIFIED:
151 LOG.debug("Config for node {} updated", nodeId);
152 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
155 if (contexts.containsKey(dataModifIdent)) {
156 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
157 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
159 LOG.debug("Config for node {} created", nodeId);
160 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
164 LOG.debug("Config for node {} deleted", nodeId);
165 stopNetconfDeviceContext(dataModifIdent);
168 LOG.warn("Unknown operation for {}.", nodeId);
173 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
174 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
175 context.refresh(createSetup(instanceIdentifier, node));
178 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
179 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
180 // retry registration several times and log the error.
181 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
182 @SuppressWarnings("checkstyle:IllegalCatch")
183 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
184 final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
186 final Timeout actorResponseWaitTime = Timeout.create(
187 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
189 final ServiceGroupIdentifier serviceGroupIdent =
190 ServiceGroupIdentifier.create(instanceIdentifier.toString());
192 final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
193 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
198 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
199 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
200 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
201 contexts.put(instanceIdentifier, newNetconfTopologyContext);
203 } catch (final RuntimeException e) {
204 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
207 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
208 newNetconfTopologyContext, e);
209 close(newNetconfTopologyContext);
216 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
217 final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
218 if (netconfTopologyContext != null) {
219 close(clusterRegistrations.remove(instanceIdentifier));
220 close(netconfTopologyContext);
225 protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
226 final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
227 final DeviceActionFactory deviceActionFact) {
228 return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService,
233 public void close() {
234 if (rpcReg != null) {
238 if (dataChangeListenerRegistration != null) {
239 dataChangeListenerRegistration.close();
240 dataChangeListenerRegistration = null;
243 contexts.values().forEach(NetconfTopologyManager::close);
244 clusterRegistrations.values().forEach(NetconfTopologyManager::close);
247 clusterRegistrations.clear();
250 @SuppressWarnings("checkstyle:IllegalCatch")
251 private static void close(final AutoCloseable closeable) {
254 } catch (Exception e) {
255 LOG.warn("Error closing {}", closeable, e);
259 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
260 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
261 // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
262 // also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
263 // oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
264 wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
265 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
266 .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
267 wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
269 public void onSuccess(final CommitInfo result) {
270 LOG.debug("topology initialization successful");
274 public void onFailure(final Throwable throwable) {
275 LOG.error("Unable to initialize netconf-topology", throwable);
277 }, MoreExecutors.directExecutor());
279 LOG.debug("Registering datastore listener");
280 return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
281 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
284 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
285 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
286 final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
288 return NetconfTopologySetupBuilder.create()
289 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
290 .setBaseSchemas(baseSchemas)
291 .setDataBroker(dataBroker)
292 .setInstanceIdentifier(instanceIdentifier)
293 .setRpcProviderRegistry(rpcProviderRegistry)
294 .setActionProviderRegistry(actionProviderRegistry)
296 .setActorSystem(actorSystem)
297 .setEventExecutor(eventExecutor)
298 .setKeepaliveExecutor(keepaliveExecutor)
299 .setProcessingExecutor(processingExecutor)
300 .setTopologyId(topologyId)
301 .setNetconfClientDispatcher(clientDispatcher)
302 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
304 .setIdleTimeout(writeTxIdleTimeout)
305 .setEncryptionService(encryptionService)
306 .setKeystoreAdapter(keystoreAdapter)