2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl;
11 import akka.actor.ActorSystem;
12 import akka.util.Timeout;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import io.netty.util.concurrent.EventExecutor;
17 import java.util.Collection;
18 import java.util.HashMap;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nonnull;
22 import org.opendaylight.controller.cluster.ActorSystemProvider;
23 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
24 import org.opendaylight.controller.config.threadpool.ThreadPool;
25 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
28 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
29 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
33 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
36 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
37 import org.opendaylight.netconf.client.NetconfClientDispatcher;
38 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
39 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
40 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
41 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
52 import org.opendaylight.yangtools.concepts.ListenerRegistration;
53 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import scala.concurrent.duration.Duration;
58 public class NetconfTopologyManager
59 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
61 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
63 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
64 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
65 clusterRegistrations = new HashMap<>();
67 private final DataBroker dataBroker;
68 private final RpcProviderRegistry rpcProviderRegistry;
69 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
70 private final ScheduledThreadPool keepaliveExecutor;
71 private final ThreadPool processingExecutor;
72 private final ActorSystem actorSystem;
73 private final EventExecutor eventExecutor;
74 private final NetconfClientDispatcher clientDispatcher;
75 private final String topologyId;
76 private final Duration writeTxIdleTimeout;
77 private final DOMMountPointService mountPointService;
79 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
81 public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
82 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
83 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
84 final ActorSystemProvider actorSystemProvider,
85 final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
86 final String topologyId, final Config config,
87 final DOMMountPointService mountPointService) {
88 this.dataBroker = Preconditions.checkNotNull(dataBroker);
89 this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
90 this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
91 this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
92 this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
93 this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
94 this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
95 this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
96 this.topologyId = Preconditions.checkNotNull(topologyId);
97 this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
98 this.mountPointService = mountPointService;
101 // Blueprint init method
103 dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId);
107 public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
108 for (final DataTreeModification<Node> change : changes) {
109 final DataObjectModification<Node> rootNode = change.getRootNode();
110 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
111 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
112 switch (rootNode.getModificationType()) {
113 case SUBTREE_MODIFIED:
114 LOG.debug("Config for node {} updated", nodeId);
115 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
118 if (contexts.containsKey(dataModifIdent)) {
119 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
120 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
122 LOG.debug("Config for node {} created", nodeId);
123 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
127 LOG.debug("Config for node {} deleted", nodeId);
128 stopNetconfDeviceContext(dataModifIdent);
131 LOG.warn("Unknown operation for {}.", nodeId);
136 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
137 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
138 context.refresh(createSetup(instanceIdentifier, node));
141 // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
142 // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
143 // retry registration several times and log the error.
144 // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
145 @SuppressWarnings("checkstyle:IllegalCatch")
146 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
147 final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
148 Preconditions.checkNotNull(netconfNode);
149 Preconditions.checkNotNull(netconfNode.getHost());
150 Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
152 final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
155 final ServiceGroupIdentifier serviceGroupIdent =
156 ServiceGroupIdentifier.create(instanceIdentifier.toString());
158 final NetconfTopologyContext newNetconfTopologyContext =
159 new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
160 actorResponseWaitTime, mountPointService);
165 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
166 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
167 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
168 contexts.put(instanceIdentifier, newNetconfTopologyContext);
170 } catch (final RuntimeException e) {
171 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
174 LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
175 newNetconfTopologyContext, e);
184 @SuppressWarnings("checkstyle:IllegalCatch")
185 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
186 if (contexts.containsKey(instanceIdentifier)) {
188 clusterRegistrations.get(instanceIdentifier).close();
189 contexts.get(instanceIdentifier).closeFinal();
190 } catch (final Exception e) {
191 LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
193 contexts.remove(instanceIdentifier);
194 clusterRegistrations.remove(instanceIdentifier);
198 @SuppressWarnings("checkstyle:IllegalCatch")
200 public void close() {
201 if (dataChangeListenerRegistration != null) {
202 dataChangeListenerRegistration.close();
203 dataChangeListenerRegistration = null;
205 contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
207 netconfTopologyContext.closeFinal();
208 } catch (final Exception e) {
209 LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
212 clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
214 clusterSingletonServiceRegistration.close();
215 } catch (final Exception e) {
216 LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
220 clusterRegistrations.clear();
223 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
224 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
225 initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
226 initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
227 Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
229 public void onSuccess(final Void result) {
230 LOG.debug("topology initialization successful");
234 public void onFailure(@Nonnull final Throwable throwable) {
235 LOG.error("Unable to initialize netconf-topology, {}", throwable);
239 LOG.debug("Registering datastore listener");
240 return dataBroker.registerDataTreeChangeListener(
241 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
242 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
245 private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType,
246 final String topologyId) {
247 final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
248 final InstanceIdentifier<NetworkTopology> networkTopologyId =
249 InstanceIdentifier.builder(NetworkTopology.class).build();
250 wtx.merge(datastoreType, networkTopologyId, networkTopology);
251 final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
252 wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
253 new TopologyKey(new TopologyId(topologyId))), topology);
256 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
257 final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
258 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
259 .setDataBroker(dataBroker)
260 .setInstanceIdentifier(instanceIdentifier)
261 .setRpcProviderRegistry(rpcProviderRegistry)
263 .setActorSystem(actorSystem)
264 .setEventExecutor(eventExecutor)
265 .setKeepaliveExecutor(keepaliveExecutor)
266 .setProcessingExecutor(processingExecutor)
267 .setTopologyId(topologyId)
268 .setNetconfClientDispatcher(clientDispatcher)
269 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
270 .setIdleTimeout(writeTxIdleTimeout);
272 return builder.build();