decd31e55455d4a2e0765108494a460609159304
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl;
10
11 import akka.actor.ActorSystem;
12 import akka.util.Timeout;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import io.netty.util.concurrent.EventExecutor;
17 import java.util.Collection;
18 import java.util.HashMap;
19 import java.util.Map;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nonnull;
22 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
23 import org.opendaylight.controller.cluster.ActorSystemProvider;
24 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
25 import org.opendaylight.controller.config.threadpool.ThreadPool;
26 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
27 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
28 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
29 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
30 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
31 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
34 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
35 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
36 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
37 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
38 import org.opendaylight.netconf.client.NetconfClientDispatcher;
39 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
40 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
41 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
42 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
53 import org.opendaylight.yangtools.concepts.ListenerRegistration;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import scala.concurrent.duration.Duration;
58
59 public class NetconfTopologyManager
60         implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
61
62     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
63
64     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
65     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
66             clusterRegistrations = new HashMap<>();
67
68     private final DataBroker dataBroker;
69     private final RpcProviderRegistry rpcProviderRegistry;
70     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
71     private final ScheduledThreadPool keepaliveExecutor;
72     private final ThreadPool processingExecutor;
73     private final ActorSystem actorSystem;
74     private final EventExecutor eventExecutor;
75     private final NetconfClientDispatcher clientDispatcher;
76     private final String topologyId;
77     private final Duration writeTxIdleTimeout;
78     private final DOMMountPointService mountPointService;
79     private final AAAEncryptionService encryptionService;
80     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
81
82     public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
83                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
84                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
85                                   final ActorSystemProvider actorSystemProvider,
86                                   final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
87                                   final String topologyId, final Config config,
88                                   final DOMMountPointService mountPointService,
89                                   final AAAEncryptionService encryptionService) {
90
91         this.dataBroker = Preconditions.checkNotNull(dataBroker);
92         this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
93         this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
94         this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
95         this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
96         this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
97         this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
98         this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
99         this.topologyId = Preconditions.checkNotNull(topologyId);
100         this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
101         this.mountPointService = mountPointService;
102         this.encryptionService = Preconditions.checkNotNull(encryptionService);
103     }
104
105     // Blueprint init method
106     public void init() {
107         dataChangeListenerRegistration = registerDataTreeChangeListener(topologyId);
108     }
109
110     @Override
111     public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
112         for (final DataTreeModification<Node> change : changes) {
113             final DataObjectModification<Node> rootNode = change.getRootNode();
114             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
115             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
116             switch (rootNode.getModificationType()) {
117                 case SUBTREE_MODIFIED:
118                     LOG.debug("Config for node {} updated", nodeId);
119                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
120                     break;
121                 case WRITE:
122                     if (contexts.containsKey(dataModifIdent)) {
123                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
124                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
125                     } else {
126                         LOG.debug("Config for node {} created", nodeId);
127                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
128                     }
129                     break;
130                 case DELETE:
131                     LOG.debug("Config for node {} deleted", nodeId);
132                     stopNetconfDeviceContext(dataModifIdent);
133                     break;
134                 default:
135                     LOG.warn("Unknown operation for {}.", nodeId);
136             }
137         }
138     }
139
140     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
141         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
142         context.refresh(createSetup(instanceIdentifier, node));
143     }
144
145     // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
146     // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
147     // retry registration several times and log the error.
148     // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
149     @SuppressWarnings("checkstyle:IllegalCatch")
150     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
151         final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
152         Preconditions.checkNotNull(netconfNode);
153         Preconditions.checkNotNull(netconfNode.getHost());
154         Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
155
156         final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
157                 "seconds"));
158
159         final ServiceGroupIdentifier serviceGroupIdent =
160                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
161
162         final NetconfTopologyContext newNetconfTopologyContext =
163                 new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
164                         actorResponseWaitTime, mountPointService);
165
166         int tries = 3;
167         while (true) {
168             try {
169                 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
170                         clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
171                 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
172                 contexts.put(instanceIdentifier, newNetconfTopologyContext);
173                 break;
174             } catch (final RuntimeException e) {
175                 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
176
177                 if (--tries <= 0) {
178                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
179                             newNetconfTopologyContext, e);
180                     close();
181                     break;
182                 }
183             }
184         }
185
186     }
187
188     @SuppressWarnings("checkstyle:IllegalCatch")
189     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
190         if (contexts.containsKey(instanceIdentifier)) {
191             try {
192                 clusterRegistrations.get(instanceIdentifier).close();
193                 contexts.get(instanceIdentifier).closeFinal();
194             } catch (final Exception e) {
195                 LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
196             }
197             contexts.remove(instanceIdentifier);
198             clusterRegistrations.remove(instanceIdentifier);
199         }
200     }
201
202     @SuppressWarnings("checkstyle:IllegalCatch")
203     @Override
204     public void close() {
205         if (dataChangeListenerRegistration != null) {
206             dataChangeListenerRegistration.close();
207             dataChangeListenerRegistration = null;
208         }
209         contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
210             try {
211                 netconfTopologyContext.closeFinal();
212             } catch (final Exception e) {
213                 LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
214             }
215         });
216         clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
217             try {
218                 clusterSingletonServiceRegistration.close();
219             } catch (final Exception e) {
220                 LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
221             }
222         });
223         contexts.clear();
224         clusterRegistrations.clear();
225     }
226
227     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener(final String topologyId) {
228         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
229         initTopology(wtx, LogicalDatastoreType.CONFIGURATION, topologyId);
230         initTopology(wtx, LogicalDatastoreType.OPERATIONAL, topologyId);
231         Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
232             @Override
233             public void onSuccess(final Void result) {
234                 LOG.debug("topology initialization successful");
235             }
236
237             @Override
238             public void onFailure(@Nonnull final Throwable throwable) {
239                 LOG.error("Unable to initialize netconf-topology, {}", throwable);
240             }
241         });
242
243         LOG.debug("Registering datastore listener");
244         return dataBroker.registerDataTreeChangeListener(
245                 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
246                         NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
247     }
248
249     private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType,
250                               final String topologyId) {
251         final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
252         final InstanceIdentifier<NetworkTopology> networkTopologyId =
253                 InstanceIdentifier.builder(NetworkTopology.class).build();
254         wtx.merge(datastoreType, networkTopologyId, networkTopology);
255         final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
256         wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
257                 new TopologyKey(new TopologyId(topologyId))), topology);
258     }
259
260     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
261         final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
262                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
263                 .setDataBroker(dataBroker)
264                 .setInstanceIdentifier(instanceIdentifier)
265                 .setRpcProviderRegistry(rpcProviderRegistry)
266                 .setNode(node)
267                 .setActorSystem(actorSystem)
268                 .setEventExecutor(eventExecutor)
269                 .setKeepaliveExecutor(keepaliveExecutor)
270                 .setProcessingExecutor(processingExecutor)
271                 .setTopologyId(topologyId)
272                 .setNetconfClientDispatcher(clientDispatcher)
273                 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
274                 .setIdleTimeout(writeTxIdleTimeout)
275                 .setEncryptionService(encryptionService);
276
277         return builder.build();
278     }
279 }