2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import io.netty.util.concurrent.EventExecutor;
18 import java.time.Duration;
19 import java.util.Collection;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.Executor;
23 import java.util.concurrent.ScheduledExecutorService;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.RpcProviderService;
34 import org.opendaylight.mdsal.binding.api.WriteTransaction;
35 import org.opendaylight.mdsal.common.api.CommitInfo;
36 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
37 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
38 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
39 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
42 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
43 import org.opendaylight.netconf.client.NetconfClientDispatcher;
44 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
45 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
46 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
47 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
48 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
49 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
50 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
52 import org.opendaylight.netconf.topology.spi.NetconfClientConfigurationBuilderFactory;
53 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
54 import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeTopologyService;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.concepts.Registration;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
71 public class NetconfTopologyManager
72 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
74 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
76 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
77 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
78 clusterRegistrations = new ConcurrentHashMap<>();
80 private final BaseNetconfSchemas baseSchemas;
81 private final DataBroker dataBroker;
82 private final DOMRpcProviderService rpcProviderRegistry;
83 private final DOMActionProviderService actionProviderRegistry;
84 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
85 private final ScheduledThreadPool keepaliveExecutor;
86 private final ScheduledExecutorService keepaliveExecutorService;
87 private final ThreadPool processingExecutor;
88 private final Executor processingExecutorService;
89 private final ActorSystem actorSystem;
90 private final EventExecutor eventExecutor;
91 private final NetconfClientDispatcher clientDispatcher;
92 private final String topologyId;
93 private final Duration writeTxIdleTimeout;
94 private final DOMMountPointService mountPointService;
95 private final AAAEncryptionService encryptionService;
96 private final RpcProviderService rpcProviderService;
97 private final DeviceActionFactory deviceActionFactory;
98 private final NetconfClientConfigurationBuilderFactory builderFactory;
99 private final SchemaResourceManager resourceManager;
101 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
102 private Registration rpcReg;
105 public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
106 final DOMRpcProviderService rpcProviderRegistry,
107 final DOMActionProviderService actionProviderService,
108 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
109 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
110 final ActorSystemProvider actorSystemProvider,
111 final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
112 final String topologyId, final Config config,
113 final DOMMountPointService mountPointService,
114 final AAAEncryptionService encryptionService,
115 final RpcProviderService rpcProviderService,
116 final DeviceActionFactory deviceActionFactory,
117 final SchemaResourceManager resourceManager,
118 final NetconfClientConfigurationBuilderFactory builderFactory) {
119 this.baseSchemas = requireNonNull(baseSchemas);
120 this.dataBroker = requireNonNull(dataBroker);
121 this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
122 actionProviderRegistry = requireNonNull(actionProviderService);
123 this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
124 this.keepaliveExecutor = keepaliveExecutor;
125 keepaliveExecutorService = keepaliveExecutor.getExecutor();
126 this.processingExecutor = processingExecutor;
127 processingExecutorService = processingExecutor.getExecutor();
128 actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
129 this.eventExecutor = requireNonNull(eventExecutor);
130 this.clientDispatcher = requireNonNull(clientDispatcher);
131 this.topologyId = requireNonNull(topologyId);
132 writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
133 this.mountPointService = mountPointService;
134 this.encryptionService = requireNonNull(encryptionService);
135 this.rpcProviderService = requireNonNull(rpcProviderService);
136 this.deviceActionFactory = requireNonNull(deviceActionFactory);
137 this.resourceManager = requireNonNull(resourceManager);
138 this.builderFactory = requireNonNull(builderFactory);
141 // Blueprint init method
143 dataChangeListenerRegistration = registerDataTreeChangeListener();
144 rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
145 new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
149 public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
150 for (final DataTreeModification<Node> change : changes) {
151 final DataObjectModification<Node> rootNode = change.getRootNode();
152 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
153 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
154 switch (rootNode.getModificationType()) {
155 case SUBTREE_MODIFIED:
156 LOG.debug("Config for node {} updated", nodeId);
157 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
160 if (contexts.containsKey(dataModifIdent)) {
161 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
162 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
164 LOG.debug("Config for node {} created", nodeId);
165 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
169 LOG.debug("Config for node {} deleted", nodeId);
170 stopNetconfDeviceContext(dataModifIdent);
173 LOG.warn("Unknown operation for {}.", nodeId);
178 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
179 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
180 context.refresh(createSetup(instanceIdentifier, node));
183 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
184 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
185 // retry registration several times and log the error.
186 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
187 @SuppressWarnings("checkstyle:IllegalCatch")
188 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
189 final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
191 final Timeout actorResponseWaitTime = Timeout.create(
192 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
194 final ServiceGroupIdentifier serviceGroupIdent =
195 ServiceGroupIdentifier.create(instanceIdentifier.toString());
197 final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
198 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
203 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
204 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
205 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
206 contexts.put(instanceIdentifier, newNetconfTopologyContext);
208 } catch (final RuntimeException e) {
209 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
212 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
213 newNetconfTopologyContext, e);
214 close(newNetconfTopologyContext);
221 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
222 final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
223 if (netconfTopologyContext != null) {
224 close(clusterRegistrations.remove(instanceIdentifier));
225 close(netconfTopologyContext);
230 protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
231 final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
232 final DeviceActionFactory deviceActionFact) {
233 return new NetconfTopologyContext(setup.getNetconfClientDispatcher(),
234 setup.getEventExecutor(), keepaliveExecutor,
235 processingExecutor, resourceManager,
236 dataBroker, mountPointService,
237 builderFactory, deviceActionFactory,
238 baseSchemas, actorResponseWaitTime,
239 serviceGroupIdent, setup);
243 public void close() {
244 if (rpcReg != null) {
248 if (dataChangeListenerRegistration != null) {
249 dataChangeListenerRegistration.close();
250 dataChangeListenerRegistration = null;
253 contexts.values().forEach(NetconfTopologyManager::close);
254 clusterRegistrations.values().forEach(NetconfTopologyManager::close);
257 clusterRegistrations.clear();
260 @SuppressWarnings("checkstyle:IllegalCatch")
261 private static void close(final AutoCloseable closeable) {
264 } catch (Exception e) {
265 LOG.warn("Error closing {}", closeable, e);
269 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
270 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
271 // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
272 // also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
273 // oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
274 wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
275 .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
276 .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
277 wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
279 public void onSuccess(final CommitInfo result) {
280 LOG.debug("topology initialization successful");
284 public void onFailure(final Throwable throwable) {
285 LOG.error("Unable to initialize netconf-topology", throwable);
287 }, MoreExecutors.directExecutor());
289 LOG.debug("Registering datastore listener");
290 return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
291 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
294 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
295 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
296 final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
298 return NetconfTopologySetupBuilder.create()
299 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
300 .setBaseSchemas(baseSchemas)
301 .setDataBroker(dataBroker)
302 .setInstanceIdentifier(instanceIdentifier)
303 .setRpcProviderRegistry(rpcProviderRegistry)
304 .setActionProviderRegistry(actionProviderRegistry)
306 .setActorSystem(actorSystem)
307 .setEventExecutor(eventExecutor)
308 .setKeepaliveExecutor(keepaliveExecutorService)
309 .setProcessingExecutor(processingExecutorService)
310 .setTopologyId(topologyId)
311 .setNetconfClientDispatcher(clientDispatcher)
312 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
314 .setIdleTimeout(writeTxIdleTimeout)