2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl;
11 import akka.actor.ActorSystem;
12 import akka.util.Timeout;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import io.netty.util.concurrent.EventExecutor;
17 import java.util.Collection;
18 import java.util.HashMap;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nonnull;
22 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
23 import org.opendaylight.controller.cluster.ActorSystemProvider;
24 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
25 import org.opendaylight.controller.config.threadpool.ThreadPool;
26 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
29 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
30 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
31 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
34 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
37 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
38 import org.opendaylight.netconf.client.NetconfClientDispatcher;
39 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
40 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
41 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
42 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
53 import org.opendaylight.yangtools.concepts.ListenerRegistration;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import scala.concurrent.duration.Duration;
59 public class NetconfTopologyManager
60 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
62 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
64 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
65 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
66 clusterRegistrations = new HashMap<>();
68 private final DataBroker dataBroker;
69 private final RpcProviderRegistry rpcProviderRegistry;
70 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
71 private final ScheduledThreadPool keepaliveExecutor;
72 private final ThreadPool processingExecutor;
73 private final ActorSystem actorSystem;
74 private final EventExecutor eventExecutor;
75 private final NetconfClientDispatcher clientDispatcher;
76 private final String topologyId;
77 private final Duration writeTxIdleTimeout;
78 private final DOMMountPointService mountPointService;
79 private final AAAEncryptionService encryptionService;
80 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
82 public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
83 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
84 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
85 final ActorSystemProvider actorSystemProvider,
86 final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
87 final String topologyId, final Config config,
88 final DOMMountPointService mountPointService,
89 final AAAEncryptionService encryptionService) {
91 this.dataBroker = Preconditions.checkNotNull(dataBroker);
92 this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
93 this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
94 this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
95 this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
96 this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
97 this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
98 this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
99 this.topologyId = Preconditions.checkNotNull(topologyId);
100 this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
101 this.mountPointService = mountPointService;
102 this.encryptionService = Preconditions.checkNotNull(encryptionService);
105 // Blueprint init method
107 dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId);
111 public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
112 for (final DataTreeModification<Node> change : changes) {
113 final DataObjectModification<Node> rootNode = change.getRootNode();
114 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
115 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
116 switch (rootNode.getModificationType()) {
117 case SUBTREE_MODIFIED:
118 LOG.debug("Config for node {} updated", nodeId);
119 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
122 if (contexts.containsKey(dataModifIdent)) {
123 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
124 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
126 LOG.debug("Config for node {} created", nodeId);
127 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
131 LOG.debug("Config for node {} deleted", nodeId);
132 stopNetconfDeviceContext(dataModifIdent);
135 LOG.warn("Unknown operation for {}.", nodeId);
140 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
141 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
142 context.refresh(createSetup(instanceIdentifier, node));
145 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
146 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
147 // retry registration several times and log the error.
148 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
149 @SuppressWarnings("checkstyle:IllegalCatch")
150 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
151 final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
152 Preconditions.checkNotNull(netconfNode);
153 Preconditions.checkNotNull(netconfNode.getHost());
154 Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
156 final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
159 final ServiceGroupIdentifier serviceGroupIdent =
160 ServiceGroupIdentifier.create(instanceIdentifier.toString());
162 final NetconfTopologyContext newNetconfTopologyContext =
163 new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
164 actorResponseWaitTime, mountPointService);
169 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
170 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
171 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
172 contexts.put(instanceIdentifier, newNetconfTopologyContext);
174 } catch (final RuntimeException e) {
175 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
178 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
179 newNetconfTopologyContext, e);
188 @SuppressWarnings("checkstyle:IllegalCatch")
189 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
190 if (contexts.containsKey(instanceIdentifier)) {
192 clusterRegistrations.get(instanceIdentifier).close();
193 contexts.get(instanceIdentifier).closeFinal();
194 } catch (final Exception e) {
195 LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
197 contexts.remove(instanceIdentifier);
198 clusterRegistrations.remove(instanceIdentifier);
202 @SuppressWarnings("checkstyle:IllegalCatch")
204 public void close() {
205 if (dataChangeListenerRegistration != null) {
206 dataChangeListenerRegistration.close();
207 dataChangeListenerRegistration = null;
209 contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
211 netconfTopologyContext.closeFinal();
212 } catch (final Exception e) {
213 LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
216 clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
218 clusterSingletonServiceRegistration.close();
219 } catch (final Exception e) {
220 LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
224 clusterRegistrations.clear();
227 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
228 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
229 initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
230 initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
231 Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
233 public void onSuccess(final Void result) {
234 LOG.debug("topology initialization successful");
238 public void onFailure(@Nonnull final Throwable throwable) {
239 LOG.error("Unable to initialize netconf-topology, {}", throwable);
243 LOG.debug("Registering datastore listener");
244 return dataBroker.registerDataTreeChangeListener(
245 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
246 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
249 private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType,
250 final String topologyId) {
251 final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
252 final InstanceIdentifier<NetworkTopology> networkTopologyId =
253 InstanceIdentifier.builder(NetworkTopology.class).build();
254 wtx.merge(datastoreType, networkTopologyId, networkTopology);
255 final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
256 wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
257 new TopologyKey(new TopologyId(topologyId))), topology);
260 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
261 final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
262 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
263 .setDataBroker(dataBroker)
264 .setInstanceIdentifier(instanceIdentifier)
265 .setRpcProviderRegistry(rpcProviderRegistry)
267 .setActorSystem(actorSystem)
268 .setEventExecutor(eventExecutor)
269 .setKeepaliveExecutor(keepaliveExecutor)
270 .setProcessingExecutor(processingExecutor)
271 .setTopologyId(topologyId)
272 .setNetconfClientDispatcher(clientDispatcher)
273 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
274 .setIdleTimeout(writeTxIdleTimeout)
275 .setEncryptionService(encryptionService);
277 return builder.build();