2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl;
11 import akka.actor.ActorSystem;
12 import akka.util.Timeout;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import io.netty.util.concurrent.EventExecutor;
17 import java.util.Collection;
18 import java.util.HashMap;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nonnull;
22 import org.opendaylight.controller.cluster.ActorSystemProvider;
23 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
24 import org.opendaylight.controller.config.threadpool.ThreadPool;
25 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
28 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
29 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
33 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
36 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
37 import org.opendaylight.netconf.client.NetconfClientDispatcher;
38 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
39 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
40 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
41 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
52 import org.opendaylight.yangtools.concepts.ListenerRegistration;
53 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import scala.concurrent.duration.Duration;
58 public class NetconfTopologyManager
59 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
61 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
63 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
64 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
65 clusterRegistrations = new HashMap<>();
67 private final DataBroker dataBroker;
68 private final RpcProviderRegistry rpcProviderRegistry;
69 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
70 private final ScheduledThreadPool keepaliveExecutor;
71 private final ThreadPool processingExecutor;
72 private final ActorSystem actorSystem;
73 private final EventExecutor eventExecutor;
74 private final NetconfClientDispatcher clientDispatcher;
75 private final String topologyId;
76 private final Duration writeTxIdleTimeout;
77 private final DOMMountPointService mountPointService;
79 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
81 public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
82 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
83 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
84 final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
85 final NetconfClientDispatcher clientDispatcher, final String topologyId,
86 final Config config, final DOMMountPointService mountPointService) {
87 this.dataBroker = Preconditions.checkNotNull(dataBroker);
88 this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
89 this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
90 this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
91 this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
92 this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
93 this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
94 this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
95 this.topologyId = Preconditions.checkNotNull(topologyId);
96 this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
97 this.mountPointService = mountPointService;
100 // Blueprint init method
102 dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId);
106 public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
107 for (final DataTreeModification<Node> change : changes) {
108 final DataObjectModification<Node> rootNode = change.getRootNode();
109 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
110 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
111 switch (rootNode.getModificationType()) {
112 case SUBTREE_MODIFIED:
113 LOG.debug("Config for node {} updated", nodeId);
114 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
117 if (contexts.containsKey(dataModifIdent)) {
118 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
119 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
121 LOG.debug("Config for node {} created", nodeId);
122 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
126 LOG.debug("Config for node {} deleted", nodeId);
127 stopNetconfDeviceContext(dataModifIdent);
130 LOG.warn("Unknown operation for {}.", nodeId);
135 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
136 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
137 context.refresh(createSetup(instanceIdentifier, node));
140 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
141 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
142 // retry registration several times and log the error.
143 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
144 @SuppressWarnings("checkstyle:IllegalCatch")
145 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
146 final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
147 Preconditions.checkNotNull(netconfNode);
148 Preconditions.checkNotNull(netconfNode.getHost());
149 Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
151 final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
154 final ServiceGroupIdentifier serviceGroupIdent =
155 ServiceGroupIdentifier.create(instanceIdentifier.toString());
157 final NetconfTopologyContext newNetconfTopologyContext =
158 new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
159 actorResponseWaitTime, mountPointService);
164 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
165 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
166 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
167 contexts.put(instanceIdentifier, newNetconfTopologyContext);
169 } catch (final RuntimeException e) {
170 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
173 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
174 newNetconfTopologyContext, e);
183 @SuppressWarnings("checkstyle:IllegalCatch")
184 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
185 if (contexts.containsKey(instanceIdentifier)) {
187 clusterRegistrations.get(instanceIdentifier).close();
188 contexts.get(instanceIdentifier).closeFinal();
189 } catch (final Exception e) {
190 LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
192 contexts.remove(instanceIdentifier);
193 clusterRegistrations.remove(instanceIdentifier);
197 @SuppressWarnings("checkstyle:IllegalCatch")
199 public void close() {
200 if (dataChangeListenerRegistration != null) {
201 dataChangeListenerRegistration.close();
202 dataChangeListenerRegistration = null;
204 contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
206 netconfTopologyContext.closeFinal();
207 } catch (final Exception e) {
208 LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
211 clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
213 clusterSingletonServiceRegistration.close();
214 } catch (final Exception e) {
215 LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
219 clusterRegistrations.clear();
222 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
223 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
224 initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
225 initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
226 Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
228 public void onSuccess(final Void result) {
229 LOG.debug("topology initialization successful");
233 public void onFailure(@Nonnull final Throwable throwable) {
234 LOG.error("Unable to initialize netconf-topology, {}", throwable);
238 LOG.debug("Registering datastore listener");
239 return dataBroker.registerDataTreeChangeListener(
240 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
241 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
244 private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType,
245 final String topologyId) {
246 final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
247 final InstanceIdentifier<NetworkTopology> networkTopologyId =
248 InstanceIdentifier.builder(NetworkTopology.class).build();
249 wtx.merge(datastoreType, networkTopologyId, networkTopology);
250 final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
251 wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
252 new TopologyKey(new TopologyId(topologyId))), topology);
255 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
256 final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
257 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
258 .setDataBroker(dataBroker)
259 .setInstanceIdentifier(instanceIdentifier)
260 .setRpcProviderRegistry(rpcProviderRegistry)
262 .setActorSystem(actorSystem)
263 .setEventExecutor(eventExecutor)
264 .setKeepaliveExecutor(keepaliveExecutor)
265 .setProcessingExecutor(processingExecutor)
266 .setTopologyId(topologyId)
267 .setNetconfClientDispatcher(clientDispatcher)
268 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
269 .setIdleTimeout(writeTxIdleTimeout);
271 return builder.build();