Bug 8152: Transaction is already opened
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl;
10
11 import akka.actor.ActorSystem;
12 import akka.util.Timeout;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import io.netty.util.concurrent.EventExecutor;
17 import java.util.Collection;
18 import java.util.HashMap;
19 import java.util.Map;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nonnull;
22 import org.opendaylight.controller.cluster.ActorSystemProvider;
23 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
24 import org.opendaylight.controller.config.threadpool.ThreadPool;
25 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
28 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
29 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
33 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
34 import org.opendaylight.controller.sal.core.api.Broker;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
37 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
38 import org.opendaylight.netconf.client.NetconfClientDispatcher;
39 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
40 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
41 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
42 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
52 import org.opendaylight.yangtools.concepts.ListenerRegistration;
53 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import scala.concurrent.duration.Duration;
57
58 public class NetconfTopologyManager
59         implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
60
61     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
62
63     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
64     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
65             clusterRegistrations = new HashMap<>();
66
67     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
68
69     private final DataBroker dataBroker;
70     private final RpcProviderRegistry rpcProviderRegistry;
71     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
72     private final BindingAwareBroker bindingAwareBroker;
73     private final ScheduledThreadPool keepaliveExecutor;
74     private final ThreadPool processingExecutor;
75     private final Broker domBroker;
76     private final ActorSystem actorSystem;
77     private final EventExecutor eventExecutor;
78     private final NetconfClientDispatcher clientDispatcher;
79     private final String topologyId;
80     private final Duration writeTxIdleTimeout;
81
82     public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
83                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
84                                   final BindingAwareBroker bindingAwareBroker,
85                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
86                                   final Broker domBroker, final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
87                                   final NetconfClientDispatcher clientDispatcher, final String topologyId,
88                                   final int writeTxIdleTimeout) {
89         this.dataBroker = Preconditions.checkNotNull(dataBroker);
90         this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
91         this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
92         this.bindingAwareBroker = Preconditions.checkNotNull(bindingAwareBroker);
93         this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
94         this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
95         this.domBroker = Preconditions.checkNotNull(domBroker);
96         this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
97         this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
98         this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
99         this.topologyId = Preconditions.checkNotNull(topologyId);
100         this.writeTxIdleTimeout = Duration.apply(writeTxIdleTimeout, TimeUnit.SECONDS);
101     }
102
103     // Blueprint init method
104     public void init() {
105         dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId);
106     }
107
108     @Override
109     public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
110         for (final DataTreeModification<Node> change : changes) {
111             final DataObjectModification<Node> rootNode = change.getRootNode();
112             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
113             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
114             switch (rootNode.getModificationType()) {
115                 case SUBTREE_MODIFIED:
116                     LOG.debug("Config for node {} updated", nodeId);
117                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
118                     break;
119                 case WRITE:
120                     if (contexts.containsKey(dataModifIdent)) {
121                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
122                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
123                     } else {
124                         LOG.debug("Config for node {} created", nodeId);
125                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
126                     }
127                     break;
128                 case DELETE:
129                     LOG.debug("Config for node {} deleted", nodeId);
130                     stopNetconfDeviceContext(dataModifIdent);
131                     break;
132                 default:
133                     LOG.warn("Unknown operation for {}.", nodeId);
134             }
135         }
136     }
137
138     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
139         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
140         context.refresh(createSetup(instanceIdentifier, node));
141     }
142
143     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
144         final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
145         Preconditions.checkNotNull(netconfNode);
146         Preconditions.checkNotNull(netconfNode.getHost());
147         Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
148
149         final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
150                 "seconds"));
151
152         final ServiceGroupIdentifier serviceGroupIdent =
153                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
154
155         final NetconfTopologyContext newNetconfTopologyContext =
156                 new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
157                         actorResponseWaitTime);
158
159         final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration  =
160                 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
161
162         clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
163         contexts.put(instanceIdentifier, newNetconfTopologyContext);
164     }
165
166     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
167         if (contexts.containsKey(instanceIdentifier)) {
168             try {
169                 clusterRegistrations.get(instanceIdentifier).close();
170                 contexts.get(instanceIdentifier).closeFinal();
171             } catch (final Exception e) {
172                 LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
173             }
174             contexts.remove(instanceIdentifier);
175             clusterRegistrations.remove(instanceIdentifier);
176         }
177     }
178
179     @Override
180     public void close() {
181         if (dataChangeListenerRegistration != null) {
182             dataChangeListenerRegistration.close();
183             dataChangeListenerRegistration = null;
184         }
185         contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
186             try {
187                 netconfTopologyContext.closeFinal();
188             } catch (final Exception e) {
189                 LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
190             }
191         });
192         clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
193             try {
194                 clusterSingletonServiceRegistration.close();
195             } catch (final Exception e) {
196                 LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
197             }
198         });
199         contexts.clear();
200         clusterRegistrations.clear();
201     }
202
203     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
204         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
205         initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
206         initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
207         Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
208             @Override
209             public void onSuccess(final Void result) {
210                 LOG.debug("topology initialization successful");
211             }
212
213             @Override
214             public void onFailure(@Nonnull final Throwable throwable) {
215                 LOG.error("Unable to initialize netconf-topology, {}", throwable);
216             }
217         });
218
219         LOG.debug("Registering datastore listener");
220         return dataBroker.registerDataTreeChangeListener(
221                         new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
222                                 NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
223     }
224
225     private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, final String topologyId) {
226         final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
227         final InstanceIdentifier<NetworkTopology> networkTopologyId =
228                 InstanceIdentifier.builder(NetworkTopology.class).build();
229         wtx.merge(datastoreType, networkTopologyId, networkTopology);
230         final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
231         wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
232                 new TopologyKey(new TopologyId(topologyId))), topology);
233     }
234
235     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
236         final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
237                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
238                 .setDataBroker(dataBroker)
239                 .setInstanceIdentifier(instanceIdentifier)
240                 .setRpcProviderRegistry(rpcProviderRegistry)
241                 .setNode(node)
242                 .setBindingAwareBroker(bindingAwareBroker)
243                 .setActorSystem(actorSystem)
244                 .setEventExecutor(eventExecutor)
245                 .setDomBroker(domBroker)
246                 .setKeepaliveExecutor(keepaliveExecutor)
247                 .setProcessingExecutor(processingExecutor)
248                 .setTopologyId(topologyId)
249                 .setNetconfClientDispatcher(clientDispatcher)
250                 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
251                 .setIdleTimeout(writeTxIdleTimeout);
252
253         return builder.build();
254     }
255 }