Fix Broken refresh in network-topology-singleton
[netconf.git] / apps / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorSystem;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import io.netty.util.concurrent.EventExecutor;
19 import java.time.Duration;
20 import java.util.Collection;
21 import java.util.Map;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ScheduledExecutorService;
24 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
25 import org.opendaylight.controller.cluster.ActorSystemProvider;
26 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
27 import org.opendaylight.controller.config.threadpool.ThreadPool;
28 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.DataObjectModification;
31 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
32 import org.opendaylight.mdsal.binding.api.DataTreeModification;
33 import org.opendaylight.mdsal.binding.api.RpcProviderService;
34 import org.opendaylight.mdsal.binding.api.WriteTransaction;
35 import org.opendaylight.mdsal.common.api.CommitInfo;
36 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
37 import org.opendaylight.mdsal.dom.api.DOMActionProviderService;
38 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
39 import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
42 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
43 import org.opendaylight.netconf.client.NetconfClientDispatcher;
44 import org.opendaylight.netconf.client.mdsal.api.BaseNetconfSchemas;
45 import org.opendaylight.netconf.client.mdsal.api.CredentialProvider;
46 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
47 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
48 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
49 import org.opendaylight.netconf.client.mdsal.api.SslHandlerFactoryProvider;
50 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
51 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
52 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
53 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
54 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
55 import org.opendaylight.netconf.topology.spi.NetconfTopologyRPCProvider;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeTopologyService;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
65 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
66 import org.opendaylight.yangtools.concepts.ListenerRegistration;
67 import org.opendaylight.yangtools.concepts.Registration;
68 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 public class NetconfTopologyManager
73         implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
74
75     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
76
77     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
78     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
79             clusterRegistrations = new ConcurrentHashMap<>();
80
81     private final BaseNetconfSchemas baseSchemas;
82     private final DataBroker dataBroker;
83     private final DOMRpcProviderService rpcProviderRegistry;
84     private final DOMActionProviderService actionProviderRegistry;
85     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
86     private final ScheduledThreadPool keepaliveExecutor;
87     private final ScheduledExecutorService keepaliveExecutorService;
88     private final ThreadPool processingExecutor;
89     private final ListeningExecutorService processingExecutorService;
90     private final ActorSystem actorSystem;
91     private final EventExecutor eventExecutor;
92     private final NetconfClientDispatcher clientDispatcher;
93     private final String topologyId;
94     private final Duration writeTxIdleTimeout;
95     private final DOMMountPointService mountPointService;
96     private final AAAEncryptionService encryptionService;
97     private final RpcProviderService rpcProviderService;
98     private final DeviceActionFactory deviceActionFactory;
99     private final CredentialProvider credentialProvider;
100     private final SslHandlerFactoryProvider sslHandlerFactoryProvider;
101     private final SchemaResourceManager resourceManager;
102
103     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
104     private Registration rpcReg;
105
106     public NetconfTopologyManager(final BaseNetconfSchemas baseSchemas, final DataBroker dataBroker,
107                                   final DOMRpcProviderService rpcProviderRegistry,
108                                   final DOMActionProviderService actionProviderService,
109                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
110                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
111                                   final ActorSystemProvider actorSystemProvider,
112                                   final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
113                                   final String topologyId, final Config config,
114                                   final DOMMountPointService mountPointService,
115                                   final AAAEncryptionService encryptionService,
116                                   final RpcProviderService rpcProviderService,
117                                   final DeviceActionFactory deviceActionFactory,
118                                   final SchemaResourceManager resourceManager,
119                                   final CredentialProvider credentialProvider,
120                                   final SslHandlerFactoryProvider sslHandlerFactoryProvider) {
121         this.baseSchemas = requireNonNull(baseSchemas);
122         this.dataBroker = requireNonNull(dataBroker);
123         this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
124         actionProviderRegistry = requireNonNull(actionProviderService);
125         this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonServiceProvider);
126         this.keepaliveExecutor = keepaliveExecutor;
127         this.keepaliveExecutorService = keepaliveExecutor.getExecutor();
128         this.processingExecutor = processingExecutor;
129         this.processingExecutorService = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
130         actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
131         this.eventExecutor = requireNonNull(eventExecutor);
132         this.clientDispatcher = requireNonNull(clientDispatcher);
133         this.topologyId = requireNonNull(topologyId);
134         writeTxIdleTimeout = Duration.ofSeconds(config.getWriteTransactionIdleTimeout().toJava());
135         this.mountPointService = mountPointService;
136         this.encryptionService = requireNonNull(encryptionService);
137         this.rpcProviderService = requireNonNull(rpcProviderService);
138         this.deviceActionFactory = requireNonNull(deviceActionFactory);
139         this.resourceManager = requireNonNull(resourceManager);
140         this.credentialProvider = requireNonNull(credentialProvider);
141         this.sslHandlerFactoryProvider = requireNonNull(sslHandlerFactoryProvider);
142     }
143
144     // Blueprint init method
145     public void init() {
146         dataChangeListenerRegistration = registerDataTreeChangeListener();
147         rpcReg = rpcProviderService.registerRpcImplementation(NetconfNodeTopologyService.class,
148             new NetconfTopologyRPCProvider(dataBroker, encryptionService, topologyId));
149     }
150
151     @Override
152     public void onDataTreeChanged(final Collection<DataTreeModification<Node>> changes) {
153         for (final DataTreeModification<Node> change : changes) {
154             final DataObjectModification<Node> rootNode = change.getRootNode();
155             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
156             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
157             switch (rootNode.getModificationType()) {
158                 case SUBTREE_MODIFIED:
159                     LOG.debug("Config for node {} updated", nodeId);
160                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
161                     break;
162                 case WRITE:
163                     if (contexts.containsKey(dataModifIdent)) {
164                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
165                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
166                     } else {
167                         LOG.debug("Config for node {} created", nodeId);
168                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
169                     }
170                     break;
171                 case DELETE:
172                     LOG.debug("Config for node {} deleted", nodeId);
173                     stopNetconfDeviceContext(dataModifIdent);
174                     break;
175                 default:
176                     LOG.warn("Unknown operation for {}.", nodeId);
177             }
178         }
179     }
180
181     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
182         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
183         context.refresh(createSetup(instanceIdentifier, node));
184     }
185
186     // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
187     // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
188     // retry registration several times and log the error.
189     // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
190     @SuppressWarnings("checkstyle:IllegalCatch")
191     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
192         final NetconfNode netconfNode = requireNonNull(node.augmentation(NetconfNode.class));
193
194         final Timeout actorResponseWaitTime = Timeout.create(
195                 Duration.ofSeconds(netconfNode.getActorResponseWaitTime().toJava()));
196
197         final ServiceGroupIdentifier serviceGroupIdent =
198                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
199
200         final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
201             createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime, deviceActionFactory);
202
203         int tries = 3;
204         while (true) {
205             try {
206                 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
207                         clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
208                 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
209                 contexts.put(instanceIdentifier, newNetconfTopologyContext);
210                 break;
211             } catch (final RuntimeException e) {
212                 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
213
214                 if (--tries <= 0) {
215                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
216                             newNetconfTopologyContext, e);
217                     close(newNetconfTopologyContext);
218                     break;
219                 }
220             }
221         }
222     }
223
224     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
225         final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
226         if (netconfTopologyContext != null) {
227             close(clusterRegistrations.remove(instanceIdentifier));
228             close(netconfTopologyContext);
229         }
230     }
231
232     @VisibleForTesting
233     protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
234             final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
235             final DeviceActionFactory deviceActionFact) {
236         return new NetconfTopologyContext(setup.getTopologyId(), setup.getNetconfClientDispatcher(),
237                 setup.getEventExecutor(), keepaliveExecutor,
238                 processingExecutor, resourceManager,
239                 dataBroker, mountPointService,
240                 encryptionService, deviceActionFactory,
241                 baseSchemas, actorResponseWaitTime,
242                 serviceGroupIdent, setup, credentialProvider, sslHandlerFactoryProvider);
243     }
244
245     @Override
246     public void close() {
247         if (rpcReg != null) {
248             rpcReg.close();
249             rpcReg = null;
250         }
251         if (dataChangeListenerRegistration != null) {
252             dataChangeListenerRegistration.close();
253             dataChangeListenerRegistration = null;
254         }
255
256         contexts.values().forEach(NetconfTopologyManager::close);
257         clusterRegistrations.values().forEach(NetconfTopologyManager::close);
258
259         contexts.clear();
260         clusterRegistrations.clear();
261     }
262
263     @SuppressWarnings("checkstyle:IllegalCatch")
264     private static void close(final AutoCloseable closeable) {
265         try {
266             closeable.close();
267         } catch (Exception e) {
268             LOG.warn("Error closing {}", closeable, e);
269         }
270     }
271
272     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
273         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
274         // FIXME: how does this play out with lifecycle? In a cluster, someone needs to ensure this call happens, but
275         //        also we need to to make sure config -> oper is properly synchronized. Non-clustered case relies on
276         //        oper being transient and perhaps on a put() instead, how do we handle that in the clustered case?
277         wtx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(NetworkTopology.class)
278             .child(Topology.class, new TopologyKey(new TopologyId(topologyId)))
279             .build(), new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build());
280         wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
281             @Override
282             public void onSuccess(final CommitInfo result) {
283                 LOG.debug("topology initialization successful");
284             }
285
286             @Override
287             public void onFailure(final Throwable throwable) {
288                 LOG.error("Unable to initialize netconf-topology", throwable);
289             }
290         }, MoreExecutors.directExecutor());
291
292         LOG.debug("Registering datastore listener");
293         return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
294             NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
295     }
296
297     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
298         final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
299         final RemoteDeviceId deviceId = NetconfNodeUtils.toRemoteDeviceId(node.getNodeId(), netconfNode);
300
301         return NetconfTopologySetupBuilder.create()
302                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
303                 .setBaseSchemas(baseSchemas)
304                 .setDataBroker(dataBroker)
305                 .setInstanceIdentifier(instanceIdentifier)
306                 .setRpcProviderRegistry(rpcProviderRegistry)
307                 .setActionProviderRegistry(actionProviderRegistry)
308                 .setNode(node)
309                 .setActorSystem(actorSystem)
310                 .setEventExecutor(eventExecutor)
311                 .setKeepaliveExecutor(keepaliveExecutorService)
312                 .setProcessingExecutor(processingExecutorService)
313                 .setTopologyId(topologyId)
314                 .setNetconfClientDispatcher(clientDispatcher)
315                 .setSchemaResourceDTO(resourceManager.getSchemaResources(netconfNode.getSchemaCacheDirectory(),
316                     deviceId))
317                 .setIdleTimeout(writeTxIdleTimeout)
318                 .setEncryptionService(encryptionService)
319                 .setCredentialProvider(credentialProvider)
320                 .setSslHandlerFactoryProvider(sslHandlerFactoryProvider)
321                 .build();
322     }
323 }