2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl;
11 import akka.actor.ActorSystem;
12 import akka.util.Timeout;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import io.netty.util.concurrent.EventExecutor;
18 import java.util.Collection;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.TimeUnit;
22 import javax.annotation.Nonnull;
23 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
24 import org.opendaylight.controller.cluster.ActorSystemProvider;
25 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
26 import org.opendaylight.controller.config.threadpool.ThreadPool;
27 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
30 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
31 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
35 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
36 import org.opendaylight.mdsal.common.api.CommitInfo;
37 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
38 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
39 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
40 import org.opendaylight.netconf.client.NetconfClientDispatcher;
41 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
42 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
43 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
44 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
55 import org.opendaylight.yangtools.concepts.ListenerRegistration;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59 import scala.concurrent.duration.Duration;
61 public class NetconfTopologyManager
62 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
64 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
66 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
67 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
68 clusterRegistrations = new ConcurrentHashMap<>();
70 private final DataBroker dataBroker;
71 private final RpcProviderRegistry rpcProviderRegistry;
72 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
73 private final ScheduledThreadPool keepaliveExecutor;
74 private final ThreadPool processingExecutor;
75 private final ActorSystem actorSystem;
76 private final EventExecutor eventExecutor;
77 private final NetconfClientDispatcher clientDispatcher;
78 private final String topologyId;
79 private final Duration writeTxIdleTimeout;
80 private final DOMMountPointService mountPointService;
81 private final AAAEncryptionService encryptionService;
82 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
83 private String privateKeyPath;
84 private String privateKeyPassphrase;
86 public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
87 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
88 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
89 final ActorSystemProvider actorSystemProvider,
90 final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
91 final String topologyId, final Config config,
92 final DOMMountPointService mountPointService,
93 final AAAEncryptionService encryptionService) {
95 this.dataBroker = Preconditions.checkNotNull(dataBroker);
96 this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
97 this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
98 this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
99 this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
100 this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
101 this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
102 this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
103 this.topologyId = Preconditions.checkNotNull(topologyId);
104 this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
105 this.mountPointService = mountPointService;
106 this.encryptionService = Preconditions.checkNotNull(encryptionService);
110 // Blueprint init method
112 dataChangeListenerRegistration = registerDataTreeChangeListener();
116 public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
117 for (final DataTreeModification<Node> change : changes) {
118 final DataObjectModification<Node> rootNode = change.getRootNode();
119 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
120 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
121 switch (rootNode.getModificationType()) {
122 case SUBTREE_MODIFIED:
123 LOG.debug("Config for node {} updated", nodeId);
124 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
127 if (contexts.containsKey(dataModifIdent)) {
128 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
129 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
131 LOG.debug("Config for node {} created", nodeId);
132 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
136 LOG.debug("Config for node {} deleted", nodeId);
137 stopNetconfDeviceContext(dataModifIdent);
140 LOG.warn("Unknown operation for {}.", nodeId);
145 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
146 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
147 context.refresh(createSetup(instanceIdentifier, node));
150 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
151 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
152 // retry registration several times and log the error.
153 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
154 @SuppressWarnings("checkstyle:IllegalCatch")
155 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
156 final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
157 Preconditions.checkNotNull(netconfNode);
158 Preconditions.checkNotNull(netconfNode.getHost());
159 Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
161 final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
164 final ServiceGroupIdentifier serviceGroupIdent =
165 ServiceGroupIdentifier.create(instanceIdentifier.toString());
167 final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
168 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime);
173 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
174 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
175 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
176 contexts.put(instanceIdentifier, newNetconfTopologyContext);
178 } catch (final RuntimeException e) {
179 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
182 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
183 newNetconfTopologyContext, e);
184 close(newNetconfTopologyContext);
191 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
192 final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
193 if (netconfTopologyContext != null) {
194 close(clusterRegistrations.remove(instanceIdentifier));
195 close(netconfTopologyContext);
200 protected NetconfTopologyContext newNetconfTopologyContext(NetconfTopologySetup setup,
201 ServiceGroupIdentifier serviceGroupIdent, Timeout actorResponseWaitTime) {
202 return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService);
206 public void close() {
207 if (dataChangeListenerRegistration != null) {
208 dataChangeListenerRegistration.close();
209 dataChangeListenerRegistration = null;
212 contexts.values().forEach(NetconfTopologyManager::close);
213 clusterRegistrations.values().forEach(NetconfTopologyManager::close);
216 clusterRegistrations.clear();
219 @SuppressWarnings("checkstyle:IllegalCatch")
220 private static void close(AutoCloseable closeable) {
223 } catch (Exception e) {
224 LOG.warn("Error closing {}", closeable, e);
229 * Sets the private key path from location specified in configuration file using blueprint.
231 public void setPrivateKeyPath(final String privateKeyPath) {
232 this.privateKeyPath = privateKeyPath;
236 * Sets the private key passphrase from location specified in configuration file using blueprint.
238 public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
239 this.privateKeyPassphrase = privateKeyPassphrase;
242 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
243 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
244 initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
245 initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
246 wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
248 public void onSuccess(final CommitInfo result) {
249 LOG.debug("topology initialization successful");
253 public void onFailure(@Nonnull final Throwable throwable) {
254 LOG.error("Unable to initialize netconf-topology, {}", throwable);
256 }, MoreExecutors.directExecutor());
258 LOG.debug("Registering datastore listener");
259 return dataBroker.registerDataTreeChangeListener(
260 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
261 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
264 private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
265 final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
266 final InstanceIdentifier<NetworkTopology> networkTopologyId =
267 InstanceIdentifier.builder(NetworkTopology.class).build();
268 wtx.merge(datastoreType, networkTopologyId, networkTopology);
269 final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
270 wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
271 new TopologyKey(new TopologyId(topologyId))), topology);
274 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
275 final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
276 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
277 .setDataBroker(dataBroker)
278 .setInstanceIdentifier(instanceIdentifier)
279 .setRpcProviderRegistry(rpcProviderRegistry)
281 .setActorSystem(actorSystem)
282 .setEventExecutor(eventExecutor)
283 .setKeepaliveExecutor(keepaliveExecutor)
284 .setProcessingExecutor(processingExecutor)
285 .setTopologyId(topologyId)
286 .setNetconfClientDispatcher(clientDispatcher)
287 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
288 .setIdleTimeout(writeTxIdleTimeout)
289 .setPrivateKeyPath(privateKeyPath)
290 .setPrivateKeyPassphrase(privateKeyPassphrase)
291 .setEncryptionService(encryptionService);
293 return builder.build();