2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.util.Collection;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.TimeUnit;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.WriteTransaction;
34 import org.opendaylight.mdsal.common.api.CommitInfo;
35 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
36 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
37 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
38 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
39 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
40 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
41 import org.opendaylight.netconf.client.NetconfClientDispatcher;
42 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
43 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
44 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
45 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
55 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
56 import org.opendaylight.yangtools.concepts.ListenerRegistration;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60 import scala.concurrent.duration.Duration;
62 public class NetconfTopologyManager
63 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
65 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
67 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
68 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
69 clusterRegistrations = new ConcurrentHashMap<>();
71 private final DataBroker dataBroker;
72 private final DOMRpcProviderService rpcProviderRegistry;
73 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
74 private final ScheduledExecutorService keepaliveExecutor;
75 private final ListeningExecutorService processingExecutor;
76 private final ActorSystem actorSystem;
77 private final EventExecutor eventExecutor;
78 private final NetconfClientDispatcher clientDispatcher;
79 private final String topologyId;
80 private final Duration writeTxIdleTimeout;
81 private final DOMMountPointService mountPointService;
82 private final AAAEncryptionService encryptionService;
83 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
84 private String privateKeyPath;
85 private String privateKeyPassphrase;
87 public NetconfTopologyManager(final DataBroker dataBroker, final DOMRpcProviderService rpcProviderRegistry,
88 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
89 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
90 final ActorSystemProvider actorSystemProvider,
91 final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
92 final String topologyId, final Config config,
93 final DOMMountPointService mountPointService,
94 final AAAEncryptionService encryptionService) {
96 this.dataBroker = requireNonNull(dataBroker);
97 this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
98 this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
99 this.keepaliveExecutor = keepaliveExecutor.getExecutor();
100 this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
101 this.actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
102 this.eventExecutor = requireNonNull(eventExecutor);
103 this.clientDispatcher = requireNonNull(clientDispatcher);
104 this.topologyId = requireNonNull(topologyId);
105 this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
106 this.mountPointService = mountPointService;
107 this.encryptionService = requireNonNull(encryptionService);
111 // Blueprint init method
113 dataChangeListenerRegistration = registerDataTreeChangeListener();
117 public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
118 for (final DataTreeModification<Node> change : changes) {
119 final DataObjectModification<Node> rootNode = change.getRootNode();
120 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
121 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
122 switch (rootNode.getModificationType()) {
123 case SUBTREE_MODIFIED:
124 LOG.debug("Config for node {} updated", nodeId);
125 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
128 if (contexts.containsKey(dataModifIdent)) {
129 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
130 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
132 LOG.debug("Config for node {} created", nodeId);
133 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
137 LOG.debug("Config for node {} deleted", nodeId);
138 stopNetconfDeviceContext(dataModifIdent);
141 LOG.warn("Unknown operation for {}.", nodeId);
146 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
147 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
148 context.refresh(createSetup(instanceIdentifier, node));
151 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
152 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
153 // retry registration several times and log the error.
154 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
155 @SuppressWarnings("checkstyle:IllegalCatch")
156 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
157 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
158 requireNonNull(netconfNode);
159 requireNonNull(netconfNode.getHost());
160 requireNonNull(netconfNode.getHost().getIpAddress());
162 final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
165 final ServiceGroupIdentifier serviceGroupIdent =
166 ServiceGroupIdentifier.create(instanceIdentifier.toString());
168 final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
169 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime);
174 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
175 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
176 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
177 contexts.put(instanceIdentifier, newNetconfTopologyContext);
179 } catch (final RuntimeException e) {
180 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
183 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
184 newNetconfTopologyContext, e);
185 close(newNetconfTopologyContext);
192 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
193 final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
194 if (netconfTopologyContext != null) {
195 close(clusterRegistrations.remove(instanceIdentifier));
196 close(netconfTopologyContext);
201 protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
202 final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime) {
203 return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService);
207 public void close() {
208 if (dataChangeListenerRegistration != null) {
209 dataChangeListenerRegistration.close();
210 dataChangeListenerRegistration = null;
213 contexts.values().forEach(NetconfTopologyManager::close);
214 clusterRegistrations.values().forEach(NetconfTopologyManager::close);
217 clusterRegistrations.clear();
220 @SuppressWarnings("checkstyle:IllegalCatch")
221 private static void close(final AutoCloseable closeable) {
224 } catch (Exception e) {
225 LOG.warn("Error closing {}", closeable, e);
230 * Sets the private key path from location specified in configuration file using blueprint.
232 public void setPrivateKeyPath(final String privateKeyPath) {
233 this.privateKeyPath = privateKeyPath;
237 * Sets the private key passphrase from location specified in configuration file using blueprint.
239 public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
240 this.privateKeyPassphrase = privateKeyPassphrase;
243 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
244 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
245 initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
246 initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
247 wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
249 public void onSuccess(final CommitInfo result) {
250 LOG.debug("topology initialization successful");
254 public void onFailure(final Throwable throwable) {
255 LOG.error("Unable to initialize netconf-topology", throwable);
257 }, MoreExecutors.directExecutor());
259 LOG.debug("Registering datastore listener");
260 return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
261 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
264 private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
265 final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
266 final InstanceIdentifier<NetworkTopology> networkTopologyId =
267 InstanceIdentifier.builder(NetworkTopology.class).build();
268 wtx.merge(datastoreType, networkTopologyId, networkTopology);
269 final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
270 wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
271 new TopologyKey(new TopologyId(topologyId))), topology);
274 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
275 final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
276 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
277 .setDataBroker(dataBroker)
278 .setInstanceIdentifier(instanceIdentifier)
279 .setRpcProviderRegistry(rpcProviderRegistry)
281 .setActorSystem(actorSystem)
282 .setEventExecutor(eventExecutor)
283 .setKeepaliveExecutor(keepaliveExecutor)
284 .setProcessingExecutor(processingExecutor)
285 .setTopologyId(topologyId)
286 .setNetconfClientDispatcher(clientDispatcher)
287 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
288 .setIdleTimeout(writeTxIdleTimeout)
289 .setPrivateKeyPath(privateKeyPath)
290 .setPrivateKeyPassphrase(privateKeyPassphrase)
291 .setEncryptionService(encryptionService);
293 return builder.build();