2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl;
11 import akka.actor.ActorSystem;
12 import akka.util.Timeout;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.util.Collection;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.TimeUnit;
23 import javax.annotation.Nonnull;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
30 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
31 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
33 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
36 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
37 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
38 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
39 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
40 import org.opendaylight.netconf.client.NetconfClientDispatcher;
41 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
42 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
43 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
44 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
55 import org.opendaylight.yangtools.concepts.ListenerRegistration;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59 import scala.concurrent.duration.Duration;
61 public class NetconfTopologyManager
62 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
64 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
66 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
67 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
68 clusterRegistrations = new ConcurrentHashMap<>();
70 private final DataBroker dataBroker;
71 private final RpcProviderRegistry rpcProviderRegistry;
72 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
73 private final ScheduledThreadPool keepaliveExecutor;
74 private final ThreadPool processingExecutor;
75 private final ActorSystem actorSystem;
76 private final EventExecutor eventExecutor;
77 private final NetconfClientDispatcher clientDispatcher;
78 private final String topologyId;
79 private final Duration writeTxIdleTimeout;
80 private final DOMMountPointService mountPointService;
81 private final AAAEncryptionService encryptionService;
82 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
83 private String privateKeyPath;
84 private String privateKeyPassphrase;
86 public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
87 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
88 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
89 final ActorSystemProvider actorSystemProvider,
90 final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
91 final String topologyId, final Config config,
92 final DOMMountPointService mountPointService,
93 final AAAEncryptionService encryptionService) {
95 this.dataBroker = Preconditions.checkNotNull(dataBroker);
96 this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
97 this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
98 this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
99 this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
100 this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
101 this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
102 this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
103 this.topologyId = Preconditions.checkNotNull(topologyId);
104 this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
105 this.mountPointService = mountPointService;
106 this.encryptionService = Preconditions.checkNotNull(encryptionService);
110 // Blueprint init method
112 dataChangeListenerRegistration = registerDataTreeChangeListener();
116 public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
117 for (final DataTreeModification<Node> change : changes) {
118 final DataObjectModification<Node> rootNode = change.getRootNode();
119 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
120 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
121 switch (rootNode.getModificationType()) {
122 case SUBTREE_MODIFIED:
123 LOG.debug("Config for node {} updated", nodeId);
124 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
127 if (contexts.containsKey(dataModifIdent)) {
128 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
129 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
131 LOG.debug("Config for node {} created", nodeId);
132 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
136 LOG.debug("Config for node {} deleted", nodeId);
137 stopNetconfDeviceContext(dataModifIdent);
140 LOG.warn("Unknown operation for {}.", nodeId);
145 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
146 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
147 context.refresh(createSetup(instanceIdentifier, node));
150 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
151 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
152 // retry registration several times and log the error.
153 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
154 @SuppressWarnings("checkstyle:IllegalCatch")
155 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
156 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
157 Preconditions.checkNotNull(netconfNode);
158 Preconditions.checkNotNull(netconfNode.getHost());
159 Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
161 final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
164 final ServiceGroupIdentifier serviceGroupIdent =
165 ServiceGroupIdentifier.create(instanceIdentifier.toString());
167 final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
168 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime);
173 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
174 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
175 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
176 contexts.put(instanceIdentifier, newNetconfTopologyContext);
178 } catch (final RuntimeException e) {
179 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
182 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
183 newNetconfTopologyContext, e);
184 close(newNetconfTopologyContext);
191 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
192 final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
193 if (netconfTopologyContext != null) {
194 close(clusterRegistrations.remove(instanceIdentifier));
195 close(netconfTopologyContext);
200 protected NetconfTopologyContext newNetconfTopologyContext(NetconfTopologySetup setup,
201 ServiceGroupIdentifier serviceGroupIdent, Timeout actorResponseWaitTime) {
202 return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService);
206 public void close() {
207 if (dataChangeListenerRegistration != null) {
208 dataChangeListenerRegistration.close();
209 dataChangeListenerRegistration = null;
212 contexts.values().forEach(netconfTopologyContext -> close(netconfTopologyContext));
214 clusterRegistrations.values().forEach(
215 clusterSingletonServiceRegistration -> close(clusterSingletonServiceRegistration));
218 clusterRegistrations.clear();
221 @SuppressWarnings("checkstyle:IllegalCatch")
222 private void close(AutoCloseable closeable) {
225 } catch (Exception e) {
226 LOG.warn("Error closing {}", closeable, e);
231 * Sets the private key path from location specified in configuration file using blueprint.
233 public void setPrivateKeyPath(final String privateKeyPath) {
234 this.privateKeyPath = privateKeyPath;
238 * Sets the private key passphrase from location specified in configuration file using blueprint.
240 public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
241 this.privateKeyPassphrase = privateKeyPassphrase;
244 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
245 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
246 initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
247 initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
248 Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
250 public void onSuccess(final Void result) {
251 LOG.debug("topology initialization successful");
255 public void onFailure(@Nonnull final Throwable throwable) {
256 LOG.error("Unable to initialize netconf-topology, {}", throwable);
258 }, MoreExecutors.directExecutor());
260 LOG.debug("Registering datastore listener");
261 return dataBroker.registerDataTreeChangeListener(
262 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
263 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
266 private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
267 final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
268 final InstanceIdentifier<NetworkTopology> networkTopologyId =
269 InstanceIdentifier.builder(NetworkTopology.class).build();
270 wtx.merge(datastoreType, networkTopologyId, networkTopology);
271 final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
272 wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
273 new TopologyKey(new TopologyId(topologyId))), topology);
276 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
277 final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
278 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
279 .setDataBroker(dataBroker)
280 .setInstanceIdentifier(instanceIdentifier)
281 .setRpcProviderRegistry(rpcProviderRegistry)
283 .setActorSystem(actorSystem)
284 .setEventExecutor(eventExecutor)
285 .setKeepaliveExecutor(keepaliveExecutor)
286 .setProcessingExecutor(processingExecutor)
287 .setTopologyId(topologyId)
288 .setNetconfClientDispatcher(clientDispatcher)
289 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
290 .setIdleTimeout(writeTxIdleTimeout)
291 .setPrivateKeyPath(privateKeyPath)
292 .setPrivateKeyPassphrase(privateKeyPassphrase)
293 .setEncryptionService(encryptionService);
295 return builder.build();