2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl;
11 import akka.actor.ActorSystem;
12 import akka.util.Timeout;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import io.netty.util.concurrent.EventExecutor;
17 import java.util.Collection;
18 import java.util.HashMap;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nonnull;
22 import org.opendaylight.controller.cluster.ActorSystemProvider;
23 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
24 import org.opendaylight.controller.config.threadpool.ThreadPool;
25 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
28 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
29 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
33 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
36 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
37 import org.opendaylight.netconf.client.NetconfClientDispatcher;
38 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
39 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
40 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
41 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
52 import org.opendaylight.yangtools.concepts.ListenerRegistration;
53 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import scala.concurrent.duration.Duration;
58 public class NetconfTopologyManager
59 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
61 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
63 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
64 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
65 clusterRegistrations = new HashMap<>();
67 private final DataBroker dataBroker;
68 private final RpcProviderRegistry rpcProviderRegistry;
69 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
70 private final ScheduledThreadPool keepaliveExecutor;
71 private final ThreadPool processingExecutor;
72 private final ActorSystem actorSystem;
73 private final EventExecutor eventExecutor;
74 private final NetconfClientDispatcher clientDispatcher;
75 private final String topologyId;
76 private final Duration writeTxIdleTimeout;
77 private final DOMMountPointService mountPointService;
79 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
81 public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
82 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
83 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
84 final ActorSystemProvider actorSystemProvider,
85 final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
86 final String topologyId, final Config config,
87 final DOMMountPointService mountPointService) {
88 this.dataBroker = Preconditions.checkNotNull(dataBroker);
89 this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
90 this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
91 this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
92 this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
93 this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
94 this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
95 this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
96 this.topologyId = Preconditions.checkNotNull(topologyId);
97 this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
98 this.mountPointService = mountPointService;
101 // Blueprint init method
103 dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId);
107 public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
108 for (final DataTreeModification<Node> change : changes) {
109 final DataObjectModification<Node> rootNode = change.getRootNode();
110 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
111 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
112 switch (rootNode.getModificationType()) {
113 case SUBTREE_MODIFIED:
114 LOG.debug("Config for node {} updated", nodeId);
115 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
118 if (contexts.containsKey(dataModifIdent)) {
119 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
120 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
122 LOG.debug("Config for node {} created", nodeId);
123 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
127 LOG.debug("Config for node {} deleted", nodeId);
128 stopNetconfDeviceContext(dataModifIdent);
131 LOG.warn("Unknown operation for {}.", nodeId);
136 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
137 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
138 context.refresh(createSetup(instanceIdentifier, node));
141 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
142 final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
143 Preconditions.checkNotNull(netconfNode);
144 Preconditions.checkNotNull(netconfNode.getHost());
145 Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
147 final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
150 final ServiceGroupIdentifier serviceGroupIdent =
151 ServiceGroupIdentifier.create(instanceIdentifier.toString());
153 final NetconfTopologyContext newNetconfTopologyContext =
154 new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
155 actorResponseWaitTime, mountPointService);
157 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
158 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
160 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
161 contexts.put(instanceIdentifier, newNetconfTopologyContext);
164 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
165 if (contexts.containsKey(instanceIdentifier)) {
167 clusterRegistrations.get(instanceIdentifier).close();
168 contexts.get(instanceIdentifier).closeFinal();
169 } catch (final Exception e) {
170 LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
172 contexts.remove(instanceIdentifier);
173 clusterRegistrations.remove(instanceIdentifier);
178 public void close() {
179 if (dataChangeListenerRegistration != null) {
180 dataChangeListenerRegistration.close();
181 dataChangeListenerRegistration = null;
183 contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
185 netconfTopologyContext.closeFinal();
186 } catch (final Exception e) {
187 LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
190 clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
192 clusterSingletonServiceRegistration.close();
193 } catch (final Exception e) {
194 LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
198 clusterRegistrations.clear();
201 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
202 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
203 initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
204 initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
205 Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
207 public void onSuccess(final Void result) {
208 LOG.debug("topology initialization successful");
212 public void onFailure(@Nonnull final Throwable throwable) {
213 LOG.error("Unable to initialize netconf-topology, {}", throwable);
217 LOG.debug("Registering datastore listener");
218 return dataBroker.registerDataTreeChangeListener(
219 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
220 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
223 private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, final String topologyId) {
224 final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
225 final InstanceIdentifier<NetworkTopology> networkTopologyId =
226 InstanceIdentifier.builder(NetworkTopology.class).build();
227 wtx.merge(datastoreType, networkTopologyId, networkTopology);
228 final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
229 wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
230 new TopologyKey(new TopologyId(topologyId))), topology);
233 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
234 final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
235 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
236 .setDataBroker(dataBroker)
237 .setInstanceIdentifier(instanceIdentifier)
238 .setRpcProviderRegistry(rpcProviderRegistry)
240 .setActorSystem(actorSystem)
241 .setEventExecutor(eventExecutor)
242 .setKeepaliveExecutor(keepaliveExecutor)
243 .setProcessingExecutor(processingExecutor)
244 .setTopologyId(topologyId)
245 .setNetconfClientDispatcher(clientDispatcher)
246 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
247 .setIdleTimeout(writeTxIdleTimeout);
249 return builder.build();