2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl;
11 import akka.actor.ActorSystem;
12 import akka.util.Timeout;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import io.netty.util.concurrent.EventExecutor;
17 import java.util.Collection;
18 import java.util.HashMap;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nonnull;
22 import org.opendaylight.controller.cluster.ActorSystemProvider;
23 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
24 import org.opendaylight.controller.config.threadpool.ThreadPool;
25 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
28 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
29 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
33 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
36 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
37 import org.opendaylight.netconf.client.NetconfClientDispatcher;
38 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
39 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
40 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
41 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
52 import org.opendaylight.yangtools.concepts.ListenerRegistration;
53 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import scala.concurrent.duration.Duration;
58 public class NetconfTopologyManager
59 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
61 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
63 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
64 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
65 clusterRegistrations = new HashMap<>();
67 private final DataBroker dataBroker;
68 private final RpcProviderRegistry rpcProviderRegistry;
69 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
70 private final ScheduledThreadPool keepaliveExecutor;
71 private final ThreadPool processingExecutor;
72 private final ActorSystem actorSystem;
73 private final EventExecutor eventExecutor;
74 private final NetconfClientDispatcher clientDispatcher;
75 private final String topologyId;
76 private final Duration writeTxIdleTimeout;
77 private final DOMMountPointService mountPointService;
79 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
81 public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
82 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
83 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
84 final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
85 final NetconfClientDispatcher clientDispatcher, final String topologyId,
86 final Config config, final DOMMountPointService mountPointService) {
87 this.dataBroker = Preconditions.checkNotNull(dataBroker);
88 this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
89 this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
90 this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
91 this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
92 this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
93 this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
94 this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
95 this.topologyId = Preconditions.checkNotNull(topologyId);
96 this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
97 this.mountPointService = mountPointService;
100 // Blueprint init method
102 dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId);
106 public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
107 for (final DataTreeModification<Node> change : changes) {
108 final DataObjectModification<Node> rootNode = change.getRootNode();
109 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
110 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
111 switch (rootNode.getModificationType()) {
112 case SUBTREE_MODIFIED:
113 LOG.debug("Config for node {} updated", nodeId);
114 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
117 if (contexts.containsKey(dataModifIdent)) {
118 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
119 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
121 LOG.debug("Config for node {} created", nodeId);
122 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
126 LOG.debug("Config for node {} deleted", nodeId);
127 stopNetconfDeviceContext(dataModifIdent);
130 LOG.warn("Unknown operation for {}.", nodeId);
135 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
136 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
137 context.refresh(createSetup(instanceIdentifier, node));
140 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
141 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
142 // retry registration several times and log the error.
143 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
144 @SuppressWarnings("checkstyle:IllegalCatch")
145 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
146 final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
147 Preconditions.checkNotNull(netconfNode);
148 Preconditions.checkNotNull(netconfNode.getHost());
149 Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
151 final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
154 final ServiceGroupIdentifier serviceGroupIdent =
155 ServiceGroupIdentifier.create(instanceIdentifier.toString());
157 final NetconfTopologyContext newNetconfTopologyContext =
158 new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
159 actorResponseWaitTime, mountPointService);
164 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
165 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
166 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
167 contexts.put(instanceIdentifier, newNetconfTopologyContext);
169 } catch (final RuntimeException e) {
170 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
173 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
174 newNetconfTopologyContext, e);
183 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
184 if (contexts.containsKey(instanceIdentifier)) {
186 clusterRegistrations.get(instanceIdentifier).close();
187 contexts.get(instanceIdentifier).closeFinal();
188 } catch (final Exception e) {
189 LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
191 contexts.remove(instanceIdentifier);
192 clusterRegistrations.remove(instanceIdentifier);
197 public void close() {
198 if (dataChangeListenerRegistration != null) {
199 dataChangeListenerRegistration.close();
200 dataChangeListenerRegistration = null;
202 contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
204 netconfTopologyContext.closeFinal();
205 } catch (final Exception e) {
206 LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
209 clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
211 clusterSingletonServiceRegistration.close();
212 } catch (final Exception e) {
213 LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
217 clusterRegistrations.clear();
220 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
221 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
222 initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
223 initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
224 Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
226 public void onSuccess(final Void result) {
227 LOG.debug("topology initialization successful");
231 public void onFailure(@Nonnull final Throwable throwable) {
232 LOG.error("Unable to initialize netconf-topology, {}", throwable);
236 LOG.debug("Registering datastore listener");
237 return dataBroker.registerDataTreeChangeListener(
238 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
239 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
242 private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, final String topologyId) {
243 final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
244 final InstanceIdentifier<NetworkTopology> networkTopologyId =
245 InstanceIdentifier.builder(NetworkTopology.class).build();
246 wtx.merge(datastoreType, networkTopologyId, networkTopology);
247 final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
248 wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
249 new TopologyKey(new TopologyId(topologyId))), topology);
252 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
253 final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
254 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
255 .setDataBroker(dataBroker)
256 .setInstanceIdentifier(instanceIdentifier)
257 .setRpcProviderRegistry(rpcProviderRegistry)
259 .setActorSystem(actorSystem)
260 .setEventExecutor(eventExecutor)
261 .setKeepaliveExecutor(keepaliveExecutor)
262 .setProcessingExecutor(processingExecutor)
263 .setTopologyId(topologyId)
264 .setNetconfClientDispatcher(clientDispatcher)
265 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
266 .setIdleTimeout(writeTxIdleTimeout);
268 return builder.build();