BUG-732 Fix netconf client deadlock when downloading yang schemas from remote device
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDevice.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.connect.netconf;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_CONNECTED;
12 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_ID;
13 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_NODE;
14 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.INVENTORY_PATH;
15 import static org.opendaylight.controller.sal.connect.netconf.InventoryUtils.NETCONF_INVENTORY_INITIAL_CAPABILITY;
16 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.CONFIG_SOURCE_RUNNING;
17 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_DATA_QNAME;
18 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_CONFIG_QNAME;
19 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.NETCONF_GET_QNAME;
20 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toFilterStructure;
21 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.toRpcMessage;
22 import static org.opendaylight.controller.sal.connect.netconf.NetconfMapping.wrap;
23
24 import java.io.InputStream;
25 import java.net.InetSocketAddress;
26 import java.net.URI;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.ExecutorService;
35
36 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
37 import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
38 import org.opendaylight.controller.md.sal.common.api.data.DataModification;
39 import org.opendaylight.controller.md.sal.common.api.data.DataReader;
40 import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
41 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
42 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
43 import org.opendaylight.controller.sal.core.api.Provider;
44 import org.opendaylight.controller.sal.core.api.RpcImplementation;
45 import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
46 import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
47 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
48 import org.opendaylight.controller.sal.core.api.mount.MountProvisionService;
49 import org.opendaylight.protocol.framework.ReconnectStrategy;
50 import org.opendaylight.yangtools.concepts.Registration;
51 import org.opendaylight.yangtools.yang.common.QName;
52 import org.opendaylight.yangtools.yang.common.RpcResult;
53 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
54 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.Node;
56 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
57 import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
58 import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
59 import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
60 import org.opendaylight.yangtools.yang.data.impl.util.CompositeNodeBuilder;
61 import org.opendaylight.yangtools.yang.model.api.Module;
62 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
63 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
64 import org.opendaylight.yangtools.yang.model.util.repo.AbstractCachingSchemaSourceProvider;
65 import org.opendaylight.yangtools.yang.model.util.repo.SchemaSourceProvider;
66 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
67 import org.opendaylight.yangtools.yang.parser.impl.util.YangSourceContext;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70
71 import com.google.common.base.Function;
72 import com.google.common.base.Optional;
73 import com.google.common.base.Predicate;
74 import com.google.common.collect.FluentIterable;
75 import com.google.common.collect.Iterables;
76 import com.google.common.util.concurrent.ListenableFuture;
77 import io.netty.util.concurrent.EventExecutor;
78
79 public class NetconfDevice implements Provider, //
80         DataReader<InstanceIdentifier, CompositeNode>, //
81         DataCommitHandler<InstanceIdentifier, CompositeNode>, //
82         RpcImplementation, //
83         AutoCloseable {
84
85     InetSocketAddress socketAddress;
86
87     MountProvisionInstance mountInstance;
88
89     EventExecutor eventExecutor;
90
91     ExecutorService processingExecutor;
92
93     InstanceIdentifier path;
94
95     ReconnectStrategy reconnectStrategy;
96
97     AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider;
98
99     private NetconfDeviceSchemaContextProvider deviceContextProvider;
100
101     protected Logger logger;
102
103     Registration<DataReader<InstanceIdentifier, CompositeNode>> operReaderReg;
104     Registration<DataReader<InstanceIdentifier, CompositeNode>> confReaderReg;
105     Registration<DataCommitHandler<InstanceIdentifier, CompositeNode>> commitHandlerReg;
106     List<RpcRegistration> rpcReg;
107
108     String name;
109
110     MountProvisionService mountService;
111
112     NetconfClientDispatcher dispatcher;
113
114     static InstanceIdentifier ROOT_PATH = InstanceIdentifier.builder().toInstance();
115
116     SchemaSourceProvider<InputStream> remoteSourceProvider;
117
118     DataBrokerService dataBroker;
119
120     NetconfDeviceListener listener;
121
122     public NetconfDevice(String name) {
123         this.name = name;
124         this.logger = LoggerFactory.getLogger(NetconfDevice.class + "#" + name);
125         this.path = InstanceIdentifier.builder(INVENTORY_PATH)
126                 .nodeWithKey(INVENTORY_NODE, Collections.<QName, Object>singletonMap(INVENTORY_ID, name)).toInstance();
127     }
128
129     public void start() {
130         checkState(dispatcher != null, "Dispatcher must be set.");
131         checkState(schemaSourceProvider != null, "Schema Source Provider must be set.");
132         checkState(eventExecutor != null, "Event executor must be set.");
133
134         listener = new NetconfDeviceListener(this);
135
136         logger.info("Starting NETCONF Client {} for address {}", name, socketAddress);
137
138         dispatcher.createClient(socketAddress, listener, reconnectStrategy);
139     }
140
141     Optional<SchemaContext> getSchemaContext() {
142         if (deviceContextProvider == null) {
143             return Optional.absent();
144         }
145         return deviceContextProvider.currentContext;
146     }
147
148     void bringDown() {
149         if (rpcReg != null) {
150             for (RpcRegistration reg : rpcReg) {
151                 reg.close();
152             }
153             rpcReg = null;
154         }
155         closeGracefully(confReaderReg);
156         confReaderReg = null;
157         closeGracefully(operReaderReg);
158         operReaderReg = null;
159         closeGracefully(commitHandlerReg);
160         commitHandlerReg = null;
161
162         updateDeviceState(false, Collections.<QName> emptySet());
163     }
164
165     private void closeGracefully(final AutoCloseable resource) {
166         if (resource != null) {
167             try {
168                 resource.close();
169             } catch (Exception e) {
170                 logger.warn("Ignoring exception while closing {}", resource, e);
171             }
172         }
173     }
174
175     void bringUp(final SchemaSourceProvider<String> delegate, final Set<QName> capabilities) {
176         // This has to be called from separate thread, not from netty thread calling onSessionUp in DeviceListener.
177         // Reason: delegate.getSchema blocks thread when waiting for response
178         // however, if the netty thread is blocked, no incoming message can be processed
179         // ... netty should pick another thread from pool to process incoming message, but it does not http://netty.io/wiki/thread-model.html
180         // TODO redesign +refactor
181         processingExecutor.submit(new Runnable() {
182             @Override
183             public void run() {
184                 remoteSourceProvider = schemaSourceProvider.createInstanceFor(delegate);
185                 deviceContextProvider = new NetconfDeviceSchemaContextProvider(NetconfDevice.this, remoteSourceProvider);
186                 deviceContextProvider.createContextFromCapabilities(capabilities);
187                 if (mountInstance != null && getSchemaContext().isPresent()) {
188                     mountInstance.setSchemaContext(getSchemaContext().get());
189                 }
190
191                 updateDeviceState(true, capabilities);
192
193                 if (mountInstance != null) {
194                     confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, NetconfDevice.this);
195                     operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, NetconfDevice.this);
196                     commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, NetconfDevice.this);
197
198                     List<RpcRegistration> rpcs = new ArrayList<>();
199                     // TODO same condition twice
200                     if (mountInstance != null && getSchemaContext().isPresent()) {
201                         for (RpcDefinition rpc : mountInstance.getSchemaContext().getOperations()) {
202                             rpcs.add(mountInstance.addRpcImplementation(rpc.getQName(), NetconfDevice.this));
203                         }
204                     }
205                     rpcReg = rpcs;
206                 }
207             }
208         });
209     }
210
211     private void updateDeviceState(boolean up, Set<QName> capabilities) {
212         DataModificationTransaction transaction = dataBroker.beginTransaction();
213
214         CompositeNodeBuilder<ImmutableCompositeNode> it = ImmutableCompositeNode.builder();
215         it.setQName(INVENTORY_NODE);
216         it.addLeaf(INVENTORY_ID, name);
217         it.addLeaf(INVENTORY_CONNECTED, up);
218
219         logger.debug("Client capabilities {}", capabilities);
220         for (QName capability : capabilities) {
221             it.addLeaf(NETCONF_INVENTORY_INITIAL_CAPABILITY, capability);
222         }
223
224         logger.debug("Update device state transaction " + transaction.getIdentifier()
225                 + " putting operational data started.");
226         transaction.removeOperationalData(path);
227         transaction.putOperationalData(path, it.toInstance());
228         logger.debug("Update device state transaction " + transaction.getIdentifier()
229                 + " putting operational data ended.");
230
231         // FIXME: this has to be asynchronous
232         RpcResult<TransactionStatus> transactionStatus = null;
233         try {
234             transactionStatus = transaction.commit().get();
235         } catch (InterruptedException e) {
236             throw new RuntimeException("Interrupted while waiting for response", e);
237         } catch (ExecutionException e) {
238             throw new RuntimeException("Read configuration data " + path + " failed", e);
239         }
240         // TODO better ex handling
241
242         if (transactionStatus.isSuccessful()) {
243             logger.debug("Update device state transaction " + transaction.getIdentifier() + " SUCCESSFUL.");
244         } else {
245             logger.debug("Update device state transaction " + transaction.getIdentifier() + " FAILED!");
246             logger.debug("Update device state transaction status " + transaction.getStatus());
247         }
248     }
249
250     @Override
251     public CompositeNode readConfigurationData(InstanceIdentifier path) {
252         RpcResult<CompositeNode> result = null;
253         try {
254             result = this.invokeRpc(NETCONF_GET_CONFIG_QNAME,
255                     wrap(NETCONF_GET_CONFIG_QNAME, CONFIG_SOURCE_RUNNING, toFilterStructure(path))).get();
256         } catch (InterruptedException e) {
257             throw new RuntimeException("Interrupted while waiting for response", e);
258         } catch (ExecutionException e) {
259             throw new RuntimeException("Read configuration data " + path + " failed", e);
260         }
261
262         CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
263         return data == null ? null : (CompositeNode) findNode(data, path);
264     }
265
266     @Override
267     public CompositeNode readOperationalData(InstanceIdentifier path) {
268         RpcResult<CompositeNode> result = null;
269         try {
270             result = invokeRpc(NETCONF_GET_QNAME, wrap(NETCONF_GET_QNAME, toFilterStructure(path))).get();
271         } catch (InterruptedException e) {
272             throw new RuntimeException("Interrupted while waiting for response", e);
273         } catch (ExecutionException e) {
274             throw new RuntimeException("Read configuration data " + path + " failed", e);
275         }
276
277         CompositeNode data = result.getResult().getFirstCompositeByName(NETCONF_DATA_QNAME);
278         return (CompositeNode) findNode(data, path);
279     }
280
281     @Override
282     public Set<QName> getSupportedRpcs() {
283         return Collections.emptySet();
284     }
285
286     @Override
287     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
288         return listener.sendRequest(toRpcMessage(rpc, input, getSchemaContext()));
289     }
290
291     @Override
292     public Collection<ProviderFunctionality> getProviderFunctionality() {
293         return Collections.emptySet();
294     }
295
296     @Override
297     public void onSessionInitiated(ProviderSession session) {
298         dataBroker = session.getService(DataBrokerService.class);
299
300         DataModificationTransaction transaction = dataBroker.beginTransaction();
301         if (operationalNodeNotExisting(transaction)) {
302             transaction.putOperationalData(path, getNodeWithId());
303         }
304         if (configurationNodeNotExisting(transaction)) {
305             transaction.putConfigurationData(path, getNodeWithId());
306         }
307
308         try {
309             transaction.commit().get();
310         } catch (InterruptedException e) {
311             throw new RuntimeException("Interrupted while waiting for response", e);
312         } catch (ExecutionException e) {
313             throw new RuntimeException("Read configuration data " + path + " failed", e);
314         }
315
316         mountService = session.getService(MountProvisionService.class);
317         if (mountService != null) {
318             mountInstance = mountService.createOrGetMountPoint(path);
319         }
320     }
321
322     CompositeNode getNodeWithId() {
323         SimpleNodeTOImpl id = new SimpleNodeTOImpl(INVENTORY_ID, null, name);
324         return new CompositeNodeTOImpl(INVENTORY_NODE, null, Collections.<Node<?>> singletonList(id));
325     }
326
327     boolean configurationNodeNotExisting(DataModificationTransaction transaction) {
328         return null == transaction.readConfigurationData(path);
329     }
330
331     boolean operationalNodeNotExisting(DataModificationTransaction transaction) {
332         return null == transaction.readOperationalData(path);
333     }
334
335     static Node<?> findNode(CompositeNode node, InstanceIdentifier identifier) {
336
337         Node<?> current = node;
338         for (InstanceIdentifier.PathArgument arg : identifier.getPath()) {
339             if (current instanceof SimpleNode<?>) {
340                 return null;
341             } else if (current instanceof CompositeNode) {
342                 CompositeNode currentComposite = (CompositeNode) current;
343
344                 current = currentComposite.getFirstCompositeByName(arg.getNodeType());
345                 if (current == null) {
346                     current = currentComposite.getFirstCompositeByName(arg.getNodeType().withoutRevision());
347                 }
348                 if (current == null) {
349                     current = currentComposite.getFirstSimpleByName(arg.getNodeType());
350                 }
351                 if (current == null) {
352                     current = currentComposite.getFirstSimpleByName(arg.getNodeType().withoutRevision());
353                 }
354                 if (current == null) {
355                     return null;
356                 }
357             }
358         }
359         return current;
360     }
361
362     @Override
363     public DataCommitTransaction<InstanceIdentifier, CompositeNode> requestCommit(
364             DataModification<InstanceIdentifier, CompositeNode> modification) {
365         NetconfDeviceTwoPhaseCommitTransaction twoPhaseCommit = new NetconfDeviceTwoPhaseCommitTransaction(this,
366                 modification, true);
367         try {
368             twoPhaseCommit.prepare();
369         } catch (InterruptedException e) {
370             throw new RuntimeException("Interrupted while waiting for response", e);
371         } catch (ExecutionException e) {
372             throw new RuntimeException("Read configuration data " + path + " failed", e);
373         }
374          return twoPhaseCommit;
375     }
376
377     Set<QName> getCapabilities(Collection<String> capabilities) {
378         return FluentIterable.from(capabilities).filter(new Predicate<String>() {
379             @Override
380             public boolean apply(final String capability) {
381                 return capability.contains("?") && capability.contains("module=") && capability.contains("revision=");
382             }
383         }).transform(new Function<String, QName>() {
384             @Override
385             public QName apply(final String capability) {
386                 String[] parts = capability.split("\\?");
387                 String namespace = parts[0];
388                 FluentIterable<String> queryParams = FluentIterable.from(Arrays.asList(parts[1].split("&")));
389
390                 String revision = getStringAndTransform(queryParams, "revision=", "revision=");
391
392                 String moduleName = getStringAndTransform(queryParams, "module=", "module=");
393
394                 if (revision == null) {
395                     logger.warn("Netconf device was not reporting revision correctly, trying to get amp;revision=");
396                     revision = getStringAndTransform(queryParams, "amp;revision==", "revision=");
397
398                     if (revision != null) {
399                         logger.warn("Netconf device returned revision incorectly escaped for {}", capability);
400                     }
401                 }
402                 if (revision == null) {
403                     return QName.create(URI.create(namespace), null, moduleName);
404                 }
405                 return QName.create(namespace, revision, moduleName);
406             }
407
408             private String getStringAndTransform(final Iterable<String> queryParams, final String match,
409                     final String substringToRemove) {
410                 Optional<String> found = Iterables.tryFind(queryParams, new Predicate<String>() {
411                     @Override
412                     public boolean apply(final String input) {
413                         return input.startsWith(match);
414                     }
415                 });
416
417                 return found.isPresent() ? found.get().replaceAll(substringToRemove, "") : null;
418             }
419
420         }).toSet();
421     }
422
423     @Override
424     public void close() {
425         bringDown();
426     }
427
428     public String getName() {
429         return name;
430     }
431
432     public InetSocketAddress getSocketAddress() {
433         return socketAddress;
434     }
435
436     public MountProvisionInstance getMountInstance() {
437         return mountInstance;
438     }
439
440     public void setReconnectStrategy(final ReconnectStrategy reconnectStrategy) {
441         this.reconnectStrategy = reconnectStrategy;
442     }
443
444     public void setProcessingExecutor(final ExecutorService processingExecutor) {
445         this.processingExecutor = processingExecutor;
446     }
447
448     public void setSocketAddress(final InetSocketAddress socketAddress) {
449         this.socketAddress = socketAddress;
450     }
451
452     public void setEventExecutor(final EventExecutor eventExecutor) {
453         this.eventExecutor = eventExecutor;
454     }
455
456     public void setSchemaSourceProvider(final AbstractCachingSchemaSourceProvider<String, InputStream> schemaSourceProvider) {
457         this.schemaSourceProvider = schemaSourceProvider;
458     }
459
460     public void setDispatcher(final NetconfClientDispatcher dispatcher) {
461         this.dispatcher = dispatcher;
462     }
463 }
464
465 class NetconfDeviceSchemaContextProvider {
466
467     NetconfDevice device;
468
469     SchemaSourceProvider<InputStream> sourceProvider;
470
471     Optional<SchemaContext> currentContext;
472
473     NetconfDeviceSchemaContextProvider(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
474         this.device = device;
475         this.sourceProvider = sourceProvider;
476         this.currentContext = Optional.absent();
477     }
478
479     void createContextFromCapabilities(Iterable<QName> capabilities) {
480         YangSourceContext sourceContext = YangSourceContext.createFrom(capabilities, sourceProvider);
481         if (!sourceContext.getMissingSources().isEmpty()) {
482             device.logger.warn("Sources for following models are missing {}", sourceContext.getMissingSources());
483         }
484         device.logger.debug("Trying to create schema context from {}", sourceContext.getValidSources());
485         List<InputStream> modelsToParse = YangSourceContext.getValidInputStreams(sourceContext);
486         if (!sourceContext.getValidSources().isEmpty()) {
487             SchemaContext schemaContext = tryToCreateContext(modelsToParse);
488             currentContext = Optional.fromNullable(schemaContext);
489         } else {
490             currentContext = Optional.absent();
491         }
492         if (currentContext.isPresent()) {
493             device.logger.debug("Schema context successfully created.");
494         }
495     }
496
497     SchemaContext tryToCreateContext(List<InputStream> modelsToParse) {
498         YangParserImpl parser = new YangParserImpl();
499         try {
500
501             Set<Module> models = parser.parseYangModelsFromStreams(modelsToParse);
502             return parser.resolveSchemaContext(models);
503         } catch (Exception e) {
504             device.logger.debug("Error occured during parsing YANG schemas", e);
505             return null;
506         }
507     }
508 }