Bug 8032 - Initialization in sal failed, disconnecting from device
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl;
10
11 import akka.actor.ActorSystem;
12 import akka.util.Timeout;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import io.netty.util.concurrent.EventExecutor;
17 import java.util.Collection;
18 import java.util.HashMap;
19 import java.util.Map;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nonnull;
22 import org.opendaylight.controller.cluster.ActorSystemProvider;
23 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
24 import org.opendaylight.controller.config.threadpool.ThreadPool;
25 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
28 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
29 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
33 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
36 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
37 import org.opendaylight.netconf.client.NetconfClientDispatcher;
38 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
39 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
40 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
41 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
52 import org.opendaylight.yangtools.concepts.ListenerRegistration;
53 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import scala.concurrent.duration.Duration;
57
58 public class NetconfTopologyManager
59         implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
60
61     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
62
63     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
64     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
65             clusterRegistrations = new HashMap<>();
66
67     private final DataBroker dataBroker;
68     private final RpcProviderRegistry rpcProviderRegistry;
69     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
70     private final ScheduledThreadPool keepaliveExecutor;
71     private final ThreadPool processingExecutor;
72     private final ActorSystem actorSystem;
73     private final EventExecutor eventExecutor;
74     private final NetconfClientDispatcher clientDispatcher;
75     private final String topologyId;
76     private final Duration writeTxIdleTimeout;
77     private final DOMMountPointService mountPointService;
78
79     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
80
81     public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
82                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
83                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
84                                   final ActorSystemProvider actorSystemProvider,
85                                   final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
86                                   final String topologyId, final Config config,
87                                   final DOMMountPointService mountPointService) {
88         this.dataBroker = Preconditions.checkNotNull(dataBroker);
89         this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
90         this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
91         this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
92         this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
93         this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
94         this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
95         this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
96         this.topologyId = Preconditions.checkNotNull(topologyId);
97         this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
98         this.mountPointService = mountPointService;
99     }
100
101     // Blueprint init method
102     public void init() {
103         dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId);
104     }
105
106     @Override
107     public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
108         for (final DataTreeModification<Node> change : changes) {
109             final DataObjectModification<Node> rootNode = change.getRootNode();
110             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
111             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
112             switch (rootNode.getModificationType()) {
113                 case SUBTREE_MODIFIED:
114                     LOG.debug("Config for node {} updated", nodeId);
115                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
116                     break;
117                 case WRITE:
118                     if (contexts.containsKey(dataModifIdent)) {
119                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
120                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
121                     } else {
122                         LOG.debug("Config for node {} created", nodeId);
123                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
124                     }
125                     break;
126                 case DELETE:
127                     LOG.debug("Config for node {} deleted", nodeId);
128                     stopNetconfDeviceContext(dataModifIdent);
129                     break;
130                 default:
131                     LOG.warn("Unknown operation for {}.", nodeId);
132             }
133         }
134     }
135
136     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
137         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
138         context.refresh(createSetup(instanceIdentifier, node));
139     }
140
141     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
142         final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
143         Preconditions.checkNotNull(netconfNode);
144         Preconditions.checkNotNull(netconfNode.getHost());
145         Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
146
147         final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
148                 "seconds"));
149
150         final ServiceGroupIdentifier serviceGroupIdent =
151                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
152
153         final NetconfTopologyContext newNetconfTopologyContext =
154                 new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
155                         actorResponseWaitTime, mountPointService);
156
157         final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
158                 clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
159
160         clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
161         contexts.put(instanceIdentifier, newNetconfTopologyContext);
162     }
163
164     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
165         if (contexts.containsKey(instanceIdentifier)) {
166             try {
167                 clusterRegistrations.get(instanceIdentifier).close();
168                 contexts.get(instanceIdentifier).closeFinal();
169             } catch (final Exception e) {
170                 LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
171             }
172             contexts.remove(instanceIdentifier);
173             clusterRegistrations.remove(instanceIdentifier);
174         }
175     }
176
177     @Override
178     public void close() {
179         if (dataChangeListenerRegistration != null) {
180             dataChangeListenerRegistration.close();
181             dataChangeListenerRegistration = null;
182         }
183         contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
184             try {
185                 netconfTopologyContext.closeFinal();
186             } catch (final Exception e) {
187                 LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
188             }
189         });
190         clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
191             try {
192                 clusterSingletonServiceRegistration.close();
193             } catch (final Exception e) {
194                 LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
195             }
196         });
197         contexts.clear();
198         clusterRegistrations.clear();
199     }
200
201     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
202         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
203         initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
204         initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
205         Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
206             @Override
207             public void onSuccess(final Void result) {
208                 LOG.debug("topology initialization successful");
209             }
210
211             @Override
212             public void onFailure(@Nonnull final Throwable throwable) {
213                 LOG.error("Unable to initialize netconf-topology, {}", throwable);
214             }
215         });
216
217         LOG.debug("Registering datastore listener");
218         return dataBroker.registerDataTreeChangeListener(
219                 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
220                         NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
221     }
222
223     private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, final String topologyId) {
224         final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
225         final InstanceIdentifier<NetworkTopology> networkTopologyId =
226                 InstanceIdentifier.builder(NetworkTopology.class).build();
227         wtx.merge(datastoreType, networkTopologyId, networkTopology);
228         final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
229         wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
230                 new TopologyKey(new TopologyId(topologyId))), topology);
231     }
232
233     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
234         final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
235                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
236                 .setDataBroker(dataBroker)
237                 .setInstanceIdentifier(instanceIdentifier)
238                 .setRpcProviderRegistry(rpcProviderRegistry)
239                 .setNode(node)
240                 .setActorSystem(actorSystem)
241                 .setEventExecutor(eventExecutor)
242                 .setKeepaliveExecutor(keepaliveExecutor)
243                 .setProcessingExecutor(processingExecutor)
244                 .setTopologyId(topologyId)
245                 .setNetconfClientDispatcher(clientDispatcher)
246                 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
247                 .setIdleTimeout(writeTxIdleTimeout);
248
249         return builder.build();
250     }
251 }