Resolve exception from registerClusterSingletonService
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl;
10
11 import akka.actor.ActorSystem;
12 import akka.util.Timeout;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import io.netty.util.concurrent.EventExecutor;
17 import java.util.Collection;
18 import java.util.HashMap;
19 import java.util.Map;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nonnull;
22 import org.opendaylight.controller.cluster.ActorSystemProvider;
23 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
24 import org.opendaylight.controller.config.threadpool.ThreadPool;
25 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
28 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
29 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
33 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
36 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
37 import org.opendaylight.netconf.client.NetconfClientDispatcher;
38 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
39 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
40 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
41 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
52 import org.opendaylight.yangtools.concepts.ListenerRegistration;
53 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import scala.concurrent.duration.Duration;
57
58 public class NetconfTopologyManager
59         implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
60
61     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
62
63     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
64     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
65             clusterRegistrations = new HashMap<>();
66
67     private final DataBroker dataBroker;
68     private final RpcProviderRegistry rpcProviderRegistry;
69     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
70     private final ScheduledThreadPool keepaliveExecutor;
71     private final ThreadPool processingExecutor;
72     private final ActorSystem actorSystem;
73     private final EventExecutor eventExecutor;
74     private final NetconfClientDispatcher clientDispatcher;
75     private final String topologyId;
76     private final Duration writeTxIdleTimeout;
77     private final DOMMountPointService mountPointService;
78
79     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
80
81     public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
82                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
83                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
84                                   final ActorSystemProvider actorSystemProvider, final EventExecutor eventExecutor,
85                                   final NetconfClientDispatcher clientDispatcher, final String topologyId,
86                                   final Config config, final DOMMountPointService mountPointService) {
87         this.dataBroker = Preconditions.checkNotNull(dataBroker);
88         this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
89         this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
90         this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
91         this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
92         this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
93         this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
94         this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
95         this.topologyId = Preconditions.checkNotNull(topologyId);
96         this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
97         this.mountPointService = mountPointService;
98     }
99
100     // Blueprint init method
101     public void init() {
102         dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId);
103     }
104
105     @Override
106     public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
107         for (final DataTreeModification<Node> change : changes) {
108             final DataObjectModification<Node> rootNode = change.getRootNode();
109             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
110             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
111             switch (rootNode.getModificationType()) {
112                 case SUBTREE_MODIFIED:
113                     LOG.debug("Config for node {} updated", nodeId);
114                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
115                     break;
116                 case WRITE:
117                     if (contexts.containsKey(dataModifIdent)) {
118                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
119                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
120                     } else {
121                         LOG.debug("Config for node {} created", nodeId);
122                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
123                     }
124                     break;
125                 case DELETE:
126                     LOG.debug("Config for node {} deleted", nodeId);
127                     stopNetconfDeviceContext(dataModifIdent);
128                     break;
129                 default:
130                     LOG.warn("Unknown operation for {}.", nodeId);
131             }
132         }
133     }
134
135     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
136         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
137         context.refresh(createSetup(instanceIdentifier, node));
138     }
139
140     // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
141     // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
142     // retry registration several times and log the error.
143     // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
144     @SuppressWarnings("checkstyle:IllegalCatch")
145     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
146         final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
147         Preconditions.checkNotNull(netconfNode);
148         Preconditions.checkNotNull(netconfNode.getHost());
149         Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
150
151         final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
152                 "seconds"));
153
154         final ServiceGroupIdentifier serviceGroupIdent =
155                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
156
157         final NetconfTopologyContext newNetconfTopologyContext =
158                 new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
159                         actorResponseWaitTime, mountPointService);
160
161         int tries = 3;
162         while (true) {
163             try {
164                 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
165                         clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
166                 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
167                 contexts.put(instanceIdentifier, newNetconfTopologyContext);
168                 break;
169             } catch (final RuntimeException e) {
170                 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
171
172                 if (--tries <= 0) {
173                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
174                             newNetconfTopologyContext, e);
175                     close();
176                     break;
177                 }
178             }
179         }
180
181     }
182
183     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
184         if (contexts.containsKey(instanceIdentifier)) {
185             try {
186                 clusterRegistrations.get(instanceIdentifier).close();
187                 contexts.get(instanceIdentifier).closeFinal();
188             } catch (final Exception e) {
189                 LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
190             }
191             contexts.remove(instanceIdentifier);
192             clusterRegistrations.remove(instanceIdentifier);
193         }
194     }
195
196     @Override
197     public void close() {
198         if (dataChangeListenerRegistration != null) {
199             dataChangeListenerRegistration.close();
200             dataChangeListenerRegistration = null;
201         }
202         contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
203             try {
204                 netconfTopologyContext.closeFinal();
205             } catch (final Exception e) {
206                 LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
207             }
208         });
209         clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
210             try {
211                 clusterSingletonServiceRegistration.close();
212             } catch (final Exception e) {
213                 LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
214             }
215         });
216         contexts.clear();
217         clusterRegistrations.clear();
218     }
219
220     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
221         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
222         initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
223         initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
224         Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
225             @Override
226             public void onSuccess(final Void result) {
227                 LOG.debug("topology initialization successful");
228             }
229
230             @Override
231             public void onFailure(@Nonnull final Throwable throwable) {
232                 LOG.error("Unable to initialize netconf-topology, {}", throwable);
233             }
234         });
235
236         LOG.debug("Registering datastore listener");
237         return dataBroker.registerDataTreeChangeListener(
238                 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
239                         NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
240     }
241
242     private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType, final String topologyId) {
243         final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
244         final InstanceIdentifier<NetworkTopology> networkTopologyId =
245                 InstanceIdentifier.builder(NetworkTopology.class).build();
246         wtx.merge(datastoreType, networkTopologyId, networkTopology);
247         final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
248         wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
249                 new TopologyKey(new TopologyId(topologyId))), topology);
250     }
251
252     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
253         final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
254                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
255                 .setDataBroker(dataBroker)
256                 .setInstanceIdentifier(instanceIdentifier)
257                 .setRpcProviderRegistry(rpcProviderRegistry)
258                 .setNode(node)
259                 .setActorSystem(actorSystem)
260                 .setEventExecutor(eventExecutor)
261                 .setKeepaliveExecutor(keepaliveExecutor)
262                 .setProcessingExecutor(processingExecutor)
263                 .setTopologyId(topologyId)
264                 .setNetconfClientDispatcher(clientDispatcher)
265                 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
266                 .setIdleTimeout(writeTxIdleTimeout);
267
268         return builder.build();
269     }
270 }