2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.util.Collection;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.WriteTransaction;
34 import org.opendaylight.mdsal.common.api.CommitInfo;
35 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
36 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
37 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
38 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
39 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
41 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
42 import org.opendaylight.netconf.client.NetconfClientDispatcher;
43 import org.opendaylight.netconf.sal.connect.api.DeviceActionFactory;
44 import org.opendaylight.netconf.sal.connect.api.SchemaResourceManager;
45 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
46 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
47 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
48 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
49 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
55 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
56 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
60 import org.opendaylight.yangtools.concepts.ListenerRegistration;
61 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 import scala.concurrent.duration.Duration;
66 public class NetconfTopologyManager
67 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
69 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
71 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
72 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
73 clusterRegistrations = new ConcurrentHashMap<>();
75 private final DataBroker dataBroker;
76 private final DOMRpcProviderService rpcProviderRegistry;
77 private final DOMActionProviderService actionProviderRegistry;
78 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
79 private final ScheduledExecutorService keepaliveExecutor;
80 private final ListeningExecutorService processingExecutor;
81 private final ActorSystem actorSystem;
82 private final EventExecutor eventExecutor;
83 private final NetconfClientDispatcher clientDispatcher;
84 private final String topologyId;
85 private final Duration writeTxIdleTimeout;
86 private final DOMMountPointService mountPointService;
87 private final AAAEncryptionService encryptionService;
88 private final DeviceActionFactory deviceActionFactory;
89 private final SchemaResourceManager resourceManager;
90 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
91 private String privateKeyPath;
92 private String privateKeyPassphrase;
94 public NetconfTopologyManager(final DataBroker dataBroker, final DOMRpcProviderService rpcProviderRegistry,
95 final DOMActionProviderService actionProviderService,
96 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
97 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
98 final ActorSystemProvider actorSystemProvider,
99 final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
100 final String topologyId, final Config config,
101 final DOMMountPointService mountPointService,
102 final AAAEncryptionService encryptionService,
103 final DeviceActionFactory deviceActionFactory,
104 final SchemaResourceManager resourceManager) {
105 this.dataBroker = requireNonNull(dataBroker);
106 this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
107 this.actionProviderRegistry = requireNonNull(actionProviderService);
108 this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
109 this.keepaliveExecutor = keepaliveExecutor.getExecutor();
110 this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
111 this.actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
112 this.eventExecutor = requireNonNull(eventExecutor);
113 this.clientDispatcher = requireNonNull(clientDispatcher);
114 this.topologyId = requireNonNull(topologyId);
115 this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout().toJava(), TimeUnit.SECONDS);
116 this.mountPointService = mountPointService;
117 this.encryptionService = requireNonNull(encryptionService);
118 this.deviceActionFactory = requireNonNull(deviceActionFactory);
119 this.resourceManager = requireNonNull(resourceManager);
122 // Blueprint init method
124 dataChangeListenerRegistration = registerDataTreeChangeListener();
128 public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
129 for (final DataTreeModification<Node> change : changes) {
130 final DataObjectModification<Node> rootNode = change.getRootNode();
131 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
132 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
133 switch (rootNode.getModificationType()) {
134 case SUBTREE_MODIFIED:
135 LOG.debug("Config for node {} updated", nodeId);
136 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
139 if (contexts.containsKey(dataModifIdent)) {
140 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
141 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
143 LOG.debug("Config for node {} created", nodeId);
144 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
148 LOG.debug("Config for node {} deleted", nodeId);
149 stopNetconfDeviceContext(dataModifIdent);
152 LOG.warn("Unknown operation for {}.", nodeId);
157 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
158 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
159 context.refresh(createSetup(instanceIdentifier, node));
162 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
163 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
164 // retry registration several times and log the error.
165 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
166 @SuppressWarnings("checkstyle:IllegalCatch")
167 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
168 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
169 requireNonNull(netconfNode);
170 requireNonNull(netconfNode.getHost());
171 requireNonNull(netconfNode.getHost().getIpAddress());
173 final Timeout actorResponseWaitTime = new Timeout(
174 Duration.create(netconfNode.getActorResponseWaitTime().toJava(), "seconds"));
176 final ServiceGroupIdentifier serviceGroupIdent =
177 ServiceGroupIdentifier.create(instanceIdentifier.toString());
179 final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
180 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
185 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
186 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
187 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
188 contexts.put(instanceIdentifier, newNetconfTopologyContext);
190 } catch (final RuntimeException e) {
191 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
194 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
195 newNetconfTopologyContext, e);
196 close(newNetconfTopologyContext);
203 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
204 final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
205 if (netconfTopologyContext != null) {
206 close(clusterRegistrations.remove(instanceIdentifier));
207 close(netconfTopologyContext);
212 protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
213 final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
214 final DeviceActionFactory deviceActionFact) {
215 return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService,
220 public void close() {
221 if (dataChangeListenerRegistration != null) {
222 dataChangeListenerRegistration.close();
223 dataChangeListenerRegistration = null;
226 contexts.values().forEach(NetconfTopologyManager::close);
227 clusterRegistrations.values().forEach(NetconfTopologyManager::close);
230 clusterRegistrations.clear();
233 @SuppressWarnings("checkstyle:IllegalCatch")
234 private static void close(final AutoCloseable closeable) {
237 } catch (Exception e) {
238 LOG.warn("Error closing {}", closeable, e);
243 * Sets the private key path from location specified in configuration file using blueprint.
245 public void setPrivateKeyPath(final String privateKeyPath) {
246 this.privateKeyPath = privateKeyPath;
250 * Sets the private key passphrase from location specified in configuration file using blueprint.
252 public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
253 this.privateKeyPassphrase = privateKeyPassphrase;
256 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
257 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
258 initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
259 initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
260 wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
262 public void onSuccess(final CommitInfo result) {
263 LOG.debug("topology initialization successful");
267 public void onFailure(final Throwable throwable) {
268 LOG.error("Unable to initialize netconf-topology", throwable);
270 }, MoreExecutors.directExecutor());
272 LOG.debug("Registering datastore listener");
273 return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
274 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
277 private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
278 final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
279 final InstanceIdentifier<NetworkTopology> networkTopologyId =
280 InstanceIdentifier.builder(NetworkTopology.class).build();
281 wtx.merge(datastoreType, networkTopologyId, networkTopology);
282 final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
283 wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
284 new TopologyKey(new TopologyId(topologyId))), topology);
287 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
288 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
289 final RemoteDeviceId deviceId = NetconfTopologyUtils.createRemoteDeviceId(node.getNodeId(), netconfNode);
291 final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
292 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
293 .setDataBroker(dataBroker)
294 .setInstanceIdentifier(instanceIdentifier)
295 .setRpcProviderRegistry(rpcProviderRegistry)
296 .setActionProviderRegistry(actionProviderRegistry)
298 .setActorSystem(actorSystem)
299 .setEventExecutor(eventExecutor)
300 .setKeepaliveExecutor(keepaliveExecutor)
301 .setProcessingExecutor(processingExecutor)
302 .setTopologyId(topologyId)
303 .setNetconfClientDispatcher(clientDispatcher)
304 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode, deviceId))
305 .setIdleTimeout(writeTxIdleTimeout)
306 .setPrivateKeyPath(privateKeyPath)
307 .setPrivateKeyPassphrase(privateKeyPassphrase)
308 .setEncryptionService(encryptionService);
310 return builder.build();