2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl;
11 import akka.actor.ActorSystem;
12 import akka.util.Timeout;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import io.netty.util.concurrent.EventExecutor;
17 import java.util.Collection;
18 import java.util.HashMap;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nonnull;
22 import org.opendaylight.controller.cluster.ActorSystemProvider;
23 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
24 import org.opendaylight.controller.config.threadpool.ThreadPool;
25 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
28 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
29 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
33 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
34 import org.opendaylight.controller.sal.core.api.Broker;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
37 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
38 import org.opendaylight.netconf.client.NetconfClientDispatcher;
39 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
40 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
41 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
42 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
52 import org.opendaylight.yangtools.concepts.ListenerRegistration;
53 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import scala.concurrent.duration.Duration;
58 public class NetconfTopologyManager
59 implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
61 private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
63 private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
64 private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
65 clusterRegistrations = new HashMap<>();
67 private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
69 private final DataBroker dataBroker;
70 private final RpcProviderRegistry rpcProviderRegistry;
71 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
72 private final BindingAwareBroker bindingAwareBroker;
73 private final ScheduledThreadPool keepaliveExecutor;
74 private final ThreadPool processingExecutor;
75 private final Broker domBroker;
76 private final ActorSystem actorSystem;
77 private final EventExecutor eventExecutor;
78 private final NetconfClientDispatcher clientDispatcher;
79 private final String topologyId;
80 private final Duration writeTxIdleTimeout;
82 public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
83 final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
84 final BindingAwareBroker bindingAwareBroker,
85 final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
86 final Broker domBroker, final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
87 final NetconfClientDispatcher clientDispatcher, final String topologyId,
88 final int writeTxIdleTimeout) {
89 this.dataBroker = Preconditions.checkNotNull(dataBroker);
90 this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
91 this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
92 this.bindingAwareBroker = Preconditions.checkNotNull(bindingAwareBroker);
93 this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
94 this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
95 this.domBroker = Preconditions.checkNotNull(domBroker);
96 this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
97 this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
98 this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
99 this.topologyId = Preconditions.checkNotNull(topologyId);
100 this.writeTxIdleTimeout = Duration.apply(writeTxIdleTimeout, TimeUnit.SECONDS);
103 // Blueprint init method
105 dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId);
109 public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
110 for (final DataTreeModification<Node> change : changes) {
111 final DataObjectModification<Node> rootNode = change.getRootNode();
112 final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
113 final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
114 switch (rootNode.getModificationType()) {
115 case SUBTREE_MODIFIED:
116 LOG.debug("Config for node {} updated", nodeId);
117 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
120 if (contexts.containsKey(dataModifIdent)) {
121 LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
122 refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
124 LOG.debug("Config for node {} created", nodeId);
125 startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
129 LOG.debug("Config for node {} deleted", nodeId);
130 stopNetconfDeviceContext(dataModifIdent);
133 LOG.warn("Unknown operation for {}.", nodeId);
138 private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
139 final NetconfTopologyContext context = contexts.get(instanceIdentifier);
140 context.refresh(createSetup(instanceIdentifier, node));
143 private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
144 final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
145 Preconditions.checkNotNull(netconfNode);
146 Preconditions.checkNotNull(netconfNode.getHost());
147 Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
149 final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
152 final ServiceGroupIdentifier serviceGroupIdent =
153 ServiceGroupIdentifier.create(instanceIdentifier.toString());
155 final NetconfTopologyContext newNetconfTopologyContext =
156 new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
157 actorResponseWaitTime);
159 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
160 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
162 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
163 contexts.put(instanceIdentifier, newNetconfTopologyContext);
166 private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
167 if (contexts.containsKey(instanceIdentifier)) {
169 clusterRegistrations.get(instanceIdentifier).close();
170 contexts.get(instanceIdentifier).closeFinal();
171 } catch (final Exception e) {
172 LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
174 contexts.remove(instanceIdentifier);
175 clusterRegistrations.remove(instanceIdentifier);
180 public void close() {
181 if (dataChangeListenerRegistration != null) {
182 dataChangeListenerRegistration.close();
183 dataChangeListenerRegistration = null;
185 contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
187 netconfTopologyContext.closeFinal();
188 } catch (final Exception e) {
189 LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
192 clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
194 clusterSingletonServiceRegistration.close();
195 } catch (final Exception e) {
196 LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
200 clusterRegistrations.clear();
203 private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
204 final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
205 initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
206 initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
207 Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
209 public void onSuccess(final Void result) {
210 LOG.debug("topology initialization successful");
214 public void onFailure(@Nonnull final Throwable throwable) {
215 LOG.error("Unable to initialize netconf-topology, {}", throwable);
219 LOG.debug("Registering datastore listener");
220 return dataBroker.registerDataTreeChangeListener(
221 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
222 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
225 private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, final String topologyId) {
226 final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
227 final InstanceIdentifier<NetworkTopology> networkTopologyId =
228 InstanceIdentifier.builder(NetworkTopology.class).build();
229 wtx.merge(datastoreType, networkTopologyId, networkTopology);
230 final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
231 wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
232 new TopologyKey(new TopologyId(topologyId))), topology);
235 private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
236 final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
237 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
238 .setDataBroker(dataBroker)
239 .setInstanceIdentifier(instanceIdentifier)
240 .setRpcProviderRegistry(rpcProviderRegistry)
242 .setBindingAwareBroker(bindingAwareBroker)
243 .setActorSystem(actorSystem)
244 .setEventExecutor(eventExecutor)
245 .setDomBroker(domBroker)
246 .setKeepaliveExecutor(keepaliveExecutor)
247 .setProcessingExecutor(processingExecutor)
248 .setTopologyId(topologyId)
249 .setNetconfClientDispatcher(clientDispatcher)
250 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
251 .setIdleTimeout(writeTxIdleTimeout);
253 return builder.build();