Merge "NECONF-524 : Setting the netconf keepalive logic to be more proactive."
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / NetconfTopologyManager.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl;
10
11 import akka.actor.ActorSystem;
12 import akka.util.Timeout;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import io.netty.util.concurrent.EventExecutor;
18 import java.util.Collection;
19 import java.util.Map;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.TimeUnit;
22 import javax.annotation.Nonnull;
23 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
24 import org.opendaylight.controller.cluster.ActorSystemProvider;
25 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
26 import org.opendaylight.controller.config.threadpool.ThreadPool;
27 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
30 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
31 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
35 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
36 import org.opendaylight.mdsal.common.api.CommitInfo;
37 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
38 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
39 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
40 import org.opendaylight.netconf.client.NetconfClientDispatcher;
41 import org.opendaylight.netconf.topology.singleton.api.NetconfTopologySingletonService;
42 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
43 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
44 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
55 import org.opendaylight.yangtools.concepts.ListenerRegistration;
56 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59 import scala.concurrent.duration.Duration;
60
61 public class NetconfTopologyManager
62         implements ClusteredDataTreeChangeListener<Node>, NetconfTopologySingletonService, AutoCloseable {
63
64     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
65
66     private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
67     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
68             clusterRegistrations = new ConcurrentHashMap<>();
69
70     private final DataBroker dataBroker;
71     private final RpcProviderRegistry rpcProviderRegistry;
72     private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
73     private final ScheduledThreadPool keepaliveExecutor;
74     private final ThreadPool processingExecutor;
75     private final ActorSystem actorSystem;
76     private final EventExecutor eventExecutor;
77     private final NetconfClientDispatcher clientDispatcher;
78     private final String topologyId;
79     private final Duration writeTxIdleTimeout;
80     private final DOMMountPointService mountPointService;
81     private final AAAEncryptionService encryptionService;
82     private ListenerRegistration<NetconfTopologyManager> dataChangeListenerRegistration;
83     private String privateKeyPath;
84     private String privateKeyPassphrase;
85
86     public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
87                                   final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
88                                   final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
89                                   final ActorSystemProvider actorSystemProvider,
90                                   final EventExecutor eventExecutor, final NetconfClientDispatcher clientDispatcher,
91                                   final String topologyId, final Config config,
92                                   final DOMMountPointService mountPointService,
93                                   final AAAEncryptionService encryptionService) {
94
95         this.dataBroker = Preconditions.checkNotNull(dataBroker);
96         this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
97         this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
98         this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
99         this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
100         this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
101         this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
102         this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
103         this.topologyId = Preconditions.checkNotNull(topologyId);
104         this.writeTxIdleTimeout = Duration.apply(config.getWriteTransactionIdleTimeout(), TimeUnit.SECONDS);
105         this.mountPointService = mountPointService;
106         this.encryptionService = Preconditions.checkNotNull(encryptionService);
107
108     }
109
110     // Blueprint init method
111     public void init() {
112         dataChangeListenerRegistration = registerDataTreeChangeListener();
113     }
114
115     @Override
116     public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> changes) {
117         for (final DataTreeModification<Node> change : changes) {
118             final DataObjectModification<Node> rootNode = change.getRootNode();
119             final InstanceIdentifier<Node> dataModifIdent = change.getRootPath().getRootIdentifier();
120             final NodeId nodeId = NetconfTopologyUtils.getNodeId(rootNode.getIdentifier());
121             switch (rootNode.getModificationType()) {
122                 case SUBTREE_MODIFIED:
123                     LOG.debug("Config for node {} updated", nodeId);
124                     refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
125                     break;
126                 case WRITE:
127                     if (contexts.containsKey(dataModifIdent)) {
128                         LOG.debug("RemoteDevice{{}} was already configured, reconfiguring node...", nodeId);
129                         refreshNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
130                     } else {
131                         LOG.debug("Config for node {} created", nodeId);
132                         startNetconfDeviceContext(dataModifIdent, rootNode.getDataAfter());
133                     }
134                     break;
135                 case DELETE:
136                     LOG.debug("Config for node {} deleted", nodeId);
137                     stopNetconfDeviceContext(dataModifIdent);
138                     break;
139                 default:
140                     LOG.warn("Unknown operation for {}.", nodeId);
141             }
142         }
143     }
144
145     private void refreshNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
146         final NetconfTopologyContext context = contexts.get(instanceIdentifier);
147         context.refresh(createSetup(instanceIdentifier, node));
148     }
149
150     // ClusterSingletonServiceRegistration registerClusterSingletonService method throws a Runtime exception if there
151     // are problems with registration and client has to deal with it. Only thing we can do if this error occurs is to
152     // retry registration several times and log the error.
153     // TODO change to a specific documented Exception when changed in ClusterSingletonServiceProvider
154     @SuppressWarnings("checkstyle:IllegalCatch")
155     private void startNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
156         final NetconfNode netconfNode = node.augmentation(NetconfNode.class);
157         Preconditions.checkNotNull(netconfNode);
158         Preconditions.checkNotNull(netconfNode.getHost());
159         Preconditions.checkNotNull(netconfNode.getHost().getIpAddress());
160
161         final Timeout actorResponseWaitTime = new Timeout(Duration.create(netconfNode.getActorResponseWaitTime(),
162                 "seconds"));
163
164         final ServiceGroupIdentifier serviceGroupIdent =
165                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
166
167         final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
168                 createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime);
169
170         int tries = 3;
171         while (true) {
172             try {
173                 final ClusterSingletonServiceRegistration clusterSingletonServiceRegistration =
174                         clusterSingletonServiceProvider.registerClusterSingletonService(newNetconfTopologyContext);
175                 clusterRegistrations.put(instanceIdentifier, clusterSingletonServiceRegistration);
176                 contexts.put(instanceIdentifier, newNetconfTopologyContext);
177                 break;
178             } catch (final RuntimeException e) {
179                 LOG.warn("Unable to register cluster singleton service {}, trying again", newNetconfTopologyContext, e);
180
181                 if (--tries <= 0) {
182                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
183                             newNetconfTopologyContext, e);
184                     close(newNetconfTopologyContext);
185                     break;
186                 }
187             }
188         }
189     }
190
191     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
192         final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
193         if (netconfTopologyContext != null) {
194             close(clusterRegistrations.remove(instanceIdentifier));
195             close(netconfTopologyContext);
196         }
197     }
198
199     @VisibleForTesting
200     protected NetconfTopologyContext newNetconfTopologyContext(NetconfTopologySetup setup,
201             ServiceGroupIdentifier serviceGroupIdent, Timeout actorResponseWaitTime) {
202         return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService);
203     }
204
205     @Override
206     public void close() {
207         if (dataChangeListenerRegistration != null) {
208             dataChangeListenerRegistration.close();
209             dataChangeListenerRegistration = null;
210         }
211
212         contexts.values().forEach(NetconfTopologyManager::close);
213         clusterRegistrations.values().forEach(NetconfTopologyManager::close);
214
215         contexts.clear();
216         clusterRegistrations.clear();
217     }
218
219     @SuppressWarnings("checkstyle:IllegalCatch")
220     private static void close(AutoCloseable closeable) {
221         try {
222             closeable.close();
223         } catch (Exception e) {
224             LOG.warn("Error closing {}", closeable, e);
225         }
226     }
227
228     /**
229      * Sets the private key path from location specified in configuration file using blueprint.
230      */
231     public void setPrivateKeyPath(final String privateKeyPath) {
232         this.privateKeyPath = privateKeyPath;
233     }
234
235     /**
236      * Sets the private key passphrase from location specified in configuration file using blueprint.
237      */
238     public void setPrivateKeyPassphrase(final String privateKeyPassphrase) {
239         this.privateKeyPassphrase = privateKeyPassphrase;
240     }
241
242     private ListenerRegistration<NetconfTopologyManager> registerDataTreeChangeListener() {
243         final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
244         initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
245         initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
246         wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
247             @Override
248             public void onSuccess(final CommitInfo result) {
249                 LOG.debug("topology initialization successful");
250             }
251
252             @Override
253             public void onFailure(@Nonnull final Throwable throwable) {
254                 LOG.error("Unable to initialize netconf-topology, {}", throwable);
255             }
256         }, MoreExecutors.directExecutor());
257
258         LOG.debug("Registering datastore listener");
259         return dataBroker.registerDataTreeChangeListener(
260                 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
261                         NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
262     }
263
264     private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {
265         final NetworkTopology networkTopology = new NetworkTopologyBuilder().build();
266         final InstanceIdentifier<NetworkTopology> networkTopologyId =
267                 InstanceIdentifier.builder(NetworkTopology.class).build();
268         wtx.merge(datastoreType, networkTopologyId, networkTopology);
269         final Topology topology = new TopologyBuilder().setTopologyId(new TopologyId(topologyId)).build();
270         wtx.merge(datastoreType, networkTopologyId.child(Topology.class,
271                 new TopologyKey(new TopologyId(topologyId))), topology);
272     }
273
274     private NetconfTopologySetup createSetup(final InstanceIdentifier<Node> instanceIdentifier, final Node node) {
275         final NetconfTopologySetupBuilder builder = NetconfTopologySetupBuilder.create()
276                 .setClusterSingletonServiceProvider(clusterSingletonServiceProvider)
277                 .setDataBroker(dataBroker)
278                 .setInstanceIdentifier(instanceIdentifier)
279                 .setRpcProviderRegistry(rpcProviderRegistry)
280                 .setNode(node)
281                 .setActorSystem(actorSystem)
282                 .setEventExecutor(eventExecutor)
283                 .setKeepaliveExecutor(keepaliveExecutor)
284                 .setProcessingExecutor(processingExecutor)
285                 .setTopologyId(topologyId)
286                 .setNetconfClientDispatcher(clientDispatcher)
287                 .setSchemaResourceDTO(NetconfTopologyUtils.setupSchemaCacheDTO(node))
288                 .setIdleTimeout(writeTxIdleTimeout)
289                 .setPrivateKeyPath(privateKeyPath)
290                 .setPrivateKeyPassphrase(privateKeyPassphrase)
291                 .setEncryptionService(encryptionService);
292
293         return builder.build();
294     }
295 }